from __future__ import annotations

import asyncio
from collections import ChainMap, defaultdict, deque
from dataclasses import asdict, dataclass
from os import getenv
from textwrap import dedent
from typing import (
    Any,
    AsyncIterator,
    Callable,
    Dict,
    Iterator,
    List,
    Literal,
    Optional,
    Sequence,
    Set,
    Tuple,
    Type,
    Union,
    cast,
    get_args,
    overload,
)
from uuid import uuid4

from pydantic import BaseModel

from agno.agent.metrics import SessionMetrics
from agno.exceptions import ModelProviderError, StopAgentRun
from agno.knowledge.agent import AgentKnowledge
from agno.media import Audio, AudioArtifact, AudioResponse, File, Image, ImageArtifact, Video, VideoArtifact
from agno.memory.agent import AgentMemory, AgentRun
from agno.memory.v2.memory import Memory, SessionSummary
from agno.memory.v2.schema import UserMemory
from agno.models.base import Model
from agno.models.message import Citations, Message, MessageMetrics, MessageReferences
from agno.models.response import ModelResponse, ModelResponseEvent, ToolExecution
from agno.reasoning.step import NextAction, ReasoningStep, ReasoningSteps
from agno.run.base import RunResponseExtraData, RunStatus
from agno.run.messages import RunMessages
from agno.run.response import (
    RunEvent,
    RunResponse,
    RunResponseEvent,
    RunResponsePausedEvent,
)
from agno.run.team import TeamRunResponse, TeamRunResponseEvent
from agno.storage.base import Storage
from agno.storage.session.agent import AgentSession
from agno.tools.function import Function
from agno.tools.toolkit import Toolkit
from agno.utils.events import (
    create_memory_update_completed_event,
    create_memory_update_started_event,
    create_parser_model_response_completed_event,
    create_parser_model_response_started_event,
    create_reasoning_completed_event,
    create_reasoning_started_event,
    create_reasoning_step_event,
    create_run_response_cancelled_event,
    create_run_response_completed_event,
    create_run_response_content_event,
    create_run_response_continued_event,
    create_run_response_error_event,
    create_run_response_paused_event,
    create_run_response_started_event,
    create_tool_call_completed_event,
    create_tool_call_started_event,
)
from agno.utils.log import (
    log_debug,
    log_error,
    log_exception,
    log_info,
    log_warning,
    set_log_level_to_debug,
    set_log_level_to_info,
)
from agno.utils.message import get_text_from_message
from agno.utils.prompts import get_json_output_prompt, get_response_model_format_prompt
from agno.utils.response import (
    async_generator_wrapper,
    create_panel,
    create_paused_run_response_panel,
    escape_markdown_tags,
    format_tool_calls,
    generator_wrapper,
    get_paused_content,
)
from agno.utils.safe_formatter import SafeFormatter
from agno.utils.string import parse_response_model_str
from agno.utils.timer import Timer


@dataclass(init=False)
class Agent:
    # --- Agent settings ---
    # Model for this Agent
    model: Optional[Model] = None
    # Agent name
    name: Optional[str] = None
    # Agent UUID (autogenerated if not set)
    agent_id: Optional[str] = None
    # Agent introduction. This is added to the message history when a run is started.
    introduction: Optional[str] = None

    # --- User settings ---
    # Default user_id to use for this agent
    user_id: Optional[str] = None

    # --- Session settings ---
    # Default session_id to use for this agent (autogenerated if not set)
    session_id: Optional[str] = None
    # Session name
    session_name: Optional[str] = None
    # Session state (stored in the database to persist across runs)
    session_state: Optional[Dict[str, Any]] = None
    search_previous_sessions_history: Optional[bool] = False
    num_history_sessions: Optional[int] = None
    # If True, cache the session in memory
    cache_session: bool = True

    # --- Agent Context ---
    # Context available for tools and prompt functions
    context: Optional[Dict[str, Any]] = None
    # If True, add the context to the user prompt
    add_context: bool = False
    # If True, resolve the context (i.e. call any functions in the context) before running the agent
    resolve_context: bool = True

    # --- Agent Memory ---
    memory: Optional[Union[AgentMemory, Memory]] = None
    # Enable the agent to manage memories of the user
    enable_agentic_memory: bool = False
    # If True, the agent creates/updates user memories at the end of runs
    enable_user_memories: bool = False
    # If True, the agent adds a reference to the user memories in the response
    add_memory_references: Optional[bool] = None
    # If True, the agent creates/updates session summaries at the end of runs
    enable_session_summaries: bool = False
    # If True, the agent adds a reference to the session summaries in the response
    add_session_summary_references: Optional[bool] = None

    # --- Agent History ---
    # add_history_to_messages=true adds messages from the chat history to the messages list sent to the Model.
    add_history_to_messages: bool = False
    # Deprecated in favor of num_history_runs: Number of historical responses to add to the messages
    num_history_responses: Optional[int] = None
    # Number of historical runs to include in the messages
    num_history_runs: int = 3

    # --- Agent Knowledge ---
    knowledge: Optional[AgentKnowledge] = None
    # Enable RAG by adding references from AgentKnowledge to the user prompt.
    # Add knowledge_filters to the Agent class attributes
    knowledge_filters: Optional[Dict[str, Any]] = None
    # Let the agent choose the knowledge filters
    enable_agentic_knowledge_filters: Optional[bool] = False
    add_references: bool = False
    # Retrieval function to get references
    # This function, if provided, is used instead of the default search_knowledge function
    # Signature:
    # def retriever(agent: Agent, query: str, num_documents: Optional[int], **kwargs) -> Optional[list[dict]]:
    #     ...
    retriever: Optional[Callable[..., Optional[List[Union[Dict, str]]]]] = None
    references_format: Literal["json", "yaml"] = "json"

    # --- Agent Storage ---
    storage: Optional[Storage] = None
    # Extra data stored with this agent
    extra_data: Optional[Dict[str, Any]] = None

    # --- Agent Tools ---
    # A list of tools provided to the Model.
    # Tools are functions the model may generate JSON inputs for.
    tools: Optional[List[Union[Toolkit, Callable, Function, Dict]]] = None
    # Show tool calls in Agent response.
    show_tool_calls: bool = True
    # Maximum number of tool calls allowed.
    tool_call_limit: Optional[int] = None
    # Controls which (if any) tool is called by the model.
    # "none" means the model will not call a tool and instead generates a message.
    # "auto" means the model can pick between generating a message or calling a tool.
    # Specifying a particular function via {"type: "function", "function": {"name": "my_function"}}
    #   forces the model to call that tool.
    # "none" is the default when no tools are present. "auto" is the default if tools are present.
    tool_choice: Optional[Union[str, Dict[str, Any]]] = None

    # A function that acts as middleware and is called around tool calls.
    tool_hooks: Optional[List[Callable]] = None

    # --- Agent Reasoning ---
    # Enable reasoning by working through the problem step by step.
    reasoning: bool = False
    reasoning_model: Optional[Model] = None
    reasoning_agent: Optional[Agent] = None
    reasoning_min_steps: int = 1
    reasoning_max_steps: int = 10

    # --- Default tools ---
    # Add a tool that allows the Model to read the chat history.
    read_chat_history: bool = False
    # Add a tool that allows the Model to search the knowledge base (aka Agentic RAG)
    # Added only if knowledge is provided.
    search_knowledge: bool = True
    # Add a tool that allows the Model to update the knowledge base.
    update_knowledge: bool = False
    # Add a tool that allows the Model to get the tool call history.
    read_tool_call_history: bool = False

    # --- System message settings ---
    # Provide the system message as a string or function
    system_message: Optional[Union[str, Callable, Message]] = None
    # Role for the system message
    system_message_role: str = "system"
    # If True, create a default system message using agent settings and use that
    create_default_system_message: bool = True

    # --- Settings for building the default system message ---
    # A description of the Agent that is added to the start of the system message.
    description: Optional[str] = None
    # The goal of this task
    goal: Optional[str] = None
    # List of instructions for the agent.
    instructions: Optional[Union[str, List[str], Callable]] = None
    # Provide the expected output from the Agent.
    expected_output: Optional[str] = None
    # Additional context added to the end of the system message.
    additional_context: Optional[str] = None
    # If markdown=true, add instructions to format the output using markdown
    markdown: bool = False
    # If True, add the agent name to the instructions
    add_name_to_instructions: bool = False
    # If True, add the current datetime to the instructions to give the agent a sense of time
    # This allows for relative times like "tomorrow" to be used in the prompt
    add_datetime_to_instructions: bool = False
    # If True, add the current location to the instructions to give the agent a sense of place
    # This allows for location-aware responses and local context
    add_location_to_instructions: bool = False
    # Allows for custom timezone for datetime instructions following the TZ Database format (e.g. "Etc/UTC")
    timezone_identifier: Optional[str] = None
    # If True, add the session state variables in the user and system messages
    add_state_in_messages: bool = False

    # --- Extra Messages ---
    # A list of extra messages added after the system message and before the user message.
    # Use these for few-shot learning or to provide additional context to the Model.
    # Note: these are not retained in memory, they are added directly to the messages sent to the model.
    add_messages: Optional[List[Union[Dict, Message]]] = None
    success_criteria: Optional[str] = None
    # --- User message settings ---
    # Provide the user message as a string, list, dict, or function
    # Note: this will ignore the message sent to the run function
    user_message: Optional[Union[List, Dict, str, Callable, Message]] = None
    # Role for the user message
    user_message_role: str = "user"
    # If True, create a default user message using references and chat history
    create_default_user_message: bool = True

    # --- Agent Response Settings ---
    # Number of retries to attempt
    retries: int = 0
    # Delay between retries (in seconds)
    delay_between_retries: int = 1
    # Exponential backoff: if True, the delay between retries is doubled each time
    exponential_backoff: bool = False

    # --- Agent Response Model Settings ---
    # Provide a response model to get the response as a Pydantic model
    response_model: Optional[Type[BaseModel]] = None
    # Provide a secondary model to parse the response from the primary model
    parser_model: Optional[Model] = None
    # Provide a prompt for the parser model
    parser_model_prompt: Optional[str] = None
    # Provide an output model to structure the response from the main model
    output_model: Optional[Model] = None
    # Provide a prompt for the output model
    output_model_prompt: Optional[str] = None
    # If True, the response from the Model is converted into the response_model
    # Otherwise, the response is returned as a JSON string
    parse_response: bool = True
    # Use model enforced structured_outputs if supported (e.g. OpenAIChat)
    structured_outputs: Optional[bool] = None
    # If `response_model` is set, sets the response mode of the model, i.e. if the model should explicitly respond with a JSON object instead of a Pydantic model
    use_json_mode: bool = False
    # Save the response to a file
    save_response_to_file: Optional[str] = None

    # --- Agent Streaming ---
    # Stream the response from the Agent
    stream: Optional[bool] = None
    # Stream the intermediate steps from the Agent
    stream_intermediate_steps: bool = False

    # Persist the events on the run response
    store_events: bool = False
    events_to_skip: Optional[List[RunEvent]] = None

    # --- Agent Team ---
    # The team of agents that this agent can transfer tasks to.
    team: Optional[List[Agent]] = None
    team_data: Optional[Dict[str, Any]] = None
    # --- If this Agent is part of a team ---
    # If this Agent is part of a team, this is the role of the agent in the team
    role: Optional[str] = None
    # If this Agent is part of a team, this member agent will respond directly to the user
    # instead of passing the response to the leader agent
    respond_directly: bool = False
    # --- Transfer instructions ---
    # Add instructions for transferring tasks to team members
    add_transfer_instructions: bool = True
    # Separator between responses from the team
    team_response_separator: str = "\n"

    # Optional team session ID, set by the team leader agent.
    team_session_id: Optional[str] = None
    # Optional team ID. Indicates this agent is part of a team.
    team_id: Optional[str] = None

    # Optional app ID. Indicates this agent is part of an app.
    app_id: Optional[str] = None

    # Optional workflow ID. Indicates this agent is part of a workflow.
    workflow_id: Optional[str] = None
    # Set when this agent is part of a workflow.
    workflow_session_id: Optional[str] = None

    # Optional team session state. Set by the team leader agent.
    team_session_state: Optional[Dict[str, Any]] = None
    # Optional workflow session state. Set by the workflow.
    workflow_session_state: Optional[Dict[str, Any]] = None

    # --- Debug & Monitoring ---
    # Enable debug logs
    debug_mode: bool = False
    # Debug level: 1 = basic, 2 = detailed
    debug_level: Literal[1, 2] = 1

    # monitoring=True logs Agent information to agno.com for monitoring
    monitoring: bool = False
    # telemetry=True logs minimal telemetry for analytics
    # This helps us improve the Agent and provide better support
    telemetry: bool = True

    def __init__(
        self,
        *,
        model: Optional[Model] = None,
        name: Optional[str] = None,
        agent_id: Optional[str] = None,
        introduction: Optional[str] = None,
        user_id: Optional[str] = None,
        app_id: Optional[str] = None,
        session_id: Optional[str] = None,
        session_name: Optional[str] = None,
        session_state: Optional[Dict[str, Any]] = None,
        search_previous_sessions_history: Optional[bool] = False,
        num_history_sessions: Optional[int] = None,
        cache_session: bool = True,
        context: Optional[Dict[str, Any]] = None,
        add_context: bool = False,
        resolve_context: bool = True,
        memory: Optional[Union[AgentMemory, Memory]] = None,
        enable_agentic_memory: bool = False,
        enable_user_memories: bool = False,
        add_memory_references: Optional[bool] = None,
        enable_session_summaries: bool = False,
        add_session_summary_references: Optional[bool] = None,
        add_history_to_messages: bool = False,
        num_history_responses: Optional[int] = None,
        num_history_runs: int = 3,
        knowledge: Optional[AgentKnowledge] = None,
        knowledge_filters: Optional[Dict[str, Any]] = None,
        enable_agentic_knowledge_filters: Optional[bool] = None,
        add_references: bool = False,
        retriever: Optional[Callable[..., Optional[List[Union[Dict, str]]]]] = None,
        references_format: Literal["json", "yaml"] = "json",
        storage: Optional[Storage] = None,
        extra_data: Optional[Dict[str, Any]] = None,
        tools: Optional[List[Union[Toolkit, Callable, Function, Dict]]] = None,
        show_tool_calls: bool = True,
        tool_call_limit: Optional[int] = None,
        tool_choice: Optional[Union[str, Dict[str, Any]]] = None,
        tool_hooks: Optional[List[Callable]] = None,
        reasoning: bool = False,
        reasoning_model: Optional[Model] = None,
        reasoning_agent: Optional[Agent] = None,
        reasoning_min_steps: int = 1,
        reasoning_max_steps: int = 10,
        read_chat_history: bool = False,
        search_knowledge: bool = True,
        update_knowledge: bool = False,
        read_tool_call_history: bool = False,
        system_message: Optional[Union[str, Callable, Message]] = None,
        system_message_role: str = "system",
        create_default_system_message: bool = True,
        description: Optional[str] = None,
        goal: Optional[str] = None,
        success_criteria: Optional[str] = None,
        instructions: Optional[Union[str, List[str], Callable]] = None,
        expected_output: Optional[str] = None,
        additional_context: Optional[str] = None,
        markdown: bool = False,
        add_name_to_instructions: bool = False,
        add_datetime_to_instructions: bool = False,
        add_location_to_instructions: bool = False,
        timezone_identifier: Optional[str] = None,
        add_state_in_messages: bool = False,
        add_messages: Optional[List[Union[Dict, Message]]] = None,
        user_message: Optional[Union[List, Dict, str, Callable, Message]] = None,
        user_message_role: str = "user",
        create_default_user_message: bool = True,
        retries: int = 0,
        delay_between_retries: int = 1,
        exponential_backoff: bool = False,
        parser_model: Optional[Model] = None,
        parser_model_prompt: Optional[str] = None,
        response_model: Optional[Type[BaseModel]] = None,
        parse_response: bool = True,
        output_model: Optional[Model] = None,
        output_model_prompt: Optional[str] = None,
        structured_outputs: Optional[bool] = None,
        use_json_mode: bool = False,
        save_response_to_file: Optional[str] = None,
        stream: Optional[bool] = None,
        stream_intermediate_steps: bool = False,
        store_events: bool = False,
        events_to_skip: Optional[List[RunEvent]] = None,
        team: Optional[List[Agent]] = None,
        team_data: Optional[Dict[str, Any]] = None,
        role: Optional[str] = None,
        respond_directly: bool = False,
        add_transfer_instructions: bool = True,
        team_response_separator: str = "\n",
        debug_mode: bool = False,
        debug_level: Literal[1, 2] = 1,
        monitoring: bool = False,
        telemetry: bool = True,
    ):
        self.model = model
        self.name = name
        self.agent_id = agent_id
        self.introduction = introduction
        self.user_id = user_id
        self.app_id = app_id

        self.session_id = session_id
        self.session_name = session_name
        self.session_state = session_state
        self.search_previous_sessions_history = search_previous_sessions_history
        self.num_history_sessions = num_history_sessions

        self.cache_session = cache_session

        self.context = context
        self.add_context = add_context
        self.resolve_context = resolve_context

        self.memory = memory
        self.enable_agentic_memory = enable_agentic_memory
        self.enable_user_memories = enable_user_memories
        self.add_memory_references = add_memory_references
        self.enable_session_summaries = enable_session_summaries
        self.add_session_summary_references = add_session_summary_references

        self.add_history_to_messages = add_history_to_messages
        self.num_history_responses = num_history_responses
        self.num_history_runs = num_history_runs

        self.knowledge = knowledge
        self.knowledge_filters = knowledge_filters
        self.enable_agentic_knowledge_filters = enable_agentic_knowledge_filters
        self.add_references = add_references
        self.retriever = retriever
        self.references_format = references_format

        self.storage = storage
        self.extra_data = extra_data

        self.tools = tools
        self.show_tool_calls = show_tool_calls
        self.tool_call_limit = tool_call_limit
        self.tool_choice = tool_choice
        self.tool_hooks = tool_hooks

        self.reasoning = reasoning
        self.reasoning_model = reasoning_model
        self.reasoning_agent = reasoning_agent
        self.reasoning_min_steps = reasoning_min_steps
        self.reasoning_max_steps = reasoning_max_steps

        self.read_chat_history = read_chat_history
        self.search_knowledge = search_knowledge
        self.update_knowledge = update_knowledge
        self.read_tool_call_history = read_tool_call_history

        self.system_message = system_message
        self.system_message_role = system_message_role
        self.create_default_system_message = create_default_system_message

        self.description = description
        self.goal = goal
        self.success_criteria = success_criteria
        self.instructions = instructions
        self.expected_output = expected_output
        self.additional_context = additional_context
        self.markdown = markdown
        self.add_name_to_instructions = add_name_to_instructions
        self.add_datetime_to_instructions = add_datetime_to_instructions
        self.add_location_to_instructions = add_location_to_instructions
        self.timezone_identifier = timezone_identifier
        self.add_state_in_messages = add_state_in_messages
        self.add_messages = add_messages

        self.user_message = user_message
        self.user_message_role = user_message_role
        self.create_default_user_message = create_default_user_message

        self.retries = retries
        self.delay_between_retries = delay_between_retries
        self.exponential_backoff = exponential_backoff
        self.parser_model = parser_model
        self.parser_model_prompt = parser_model_prompt
        self.response_model = response_model
        self.parse_response = parse_response
        self.output_model = output_model
        self.output_model_prompt = output_model_prompt

        self.structured_outputs = structured_outputs

        self.use_json_mode = use_json_mode
        self.save_response_to_file = save_response_to_file

        self.stream = stream
        self.stream_intermediate_steps = stream_intermediate_steps

        self.store_events = store_events
        # By default, we skip the run response content event
        self.events_to_skip = events_to_skip
        if self.events_to_skip is None:
            self.events_to_skip = [RunEvent.run_response_content]

        self.team = team

        self.team_data = team_data
        self.role = role
        self.respond_directly = respond_directly
        self.add_transfer_instructions = add_transfer_instructions
        self.team_response_separator = team_response_separator

        self.debug_mode = debug_mode
        if debug_level not in [1, 2]:
            log_warning(f"Invalid debug level: {debug_level}. Setting to 1.")
            debug_level = 1
        self.debug_level = debug_level
        self.monitoring = monitoring
        self.telemetry = telemetry

        # --- Params not to be set by user ---
        self.session_metrics: Optional[SessionMetrics] = None

        self.run_id: Optional[str] = None
        self.run_input: Optional[Union[str, List, Dict, Message, BaseModel]] = None
        self.run_messages: Optional[RunMessages] = None
        self.run_response: Optional[RunResponse] = None

        # Images generated during this session
        self.images: Optional[List[ImageArtifact]] = None
        # Audio generated during this session
        self.audio: Optional[List[AudioArtifact]] = None
        # Videos generated during this session
        self.videos: Optional[List[VideoArtifact]] = None
        # Agent session
        self.agent_session: Optional[AgentSession] = None

        self._tool_instructions: Optional[List[str]] = None
        self._tools_for_model: Optional[List[Dict[str, Any]]] = None
        self._functions_for_model: Optional[Dict[str, Function]] = None
        self._rebuild_tools: bool = True

        self._formatter: Optional[SafeFormatter] = None

        self._memory_deepcopy_done: bool = False

    def set_agent_id(self) -> str:
        if self.agent_id is None:
            self.agent_id = str(uuid4())
        return self.agent_id

    def set_debug(self) -> None:
        if self.debug_mode or getenv("AGNO_DEBUG", "false").lower() == "true":
            self.debug_mode = True
            set_log_level_to_debug(level=self.debug_level)
        else:
            set_log_level_to_info()

    def set_storage_mode(self) -> None:
        if self.storage is not None:
            if self.storage.mode in ["workflow", "team"]:
                log_warning(f"You shouldn't use storage in multiple modes. Current mode is {self.storage.mode}.")

            self.storage.mode = "agent"

    def set_monitoring(self) -> None:
        """Override monitoring and telemetry settings based on environment variables."""

        # Only override if the environment variable is set
        monitor_env = getenv("AGNO_MONITOR")
        if monitor_env is not None:
            self.monitoring = monitor_env.lower() == "true"

        telemetry_env = getenv("AGNO_TELEMETRY")
        if telemetry_env is not None:
            self.telemetry = telemetry_env.lower() == "true"

    def set_default_model(self) -> None:
        # Use the default Model (OpenAIChat) if no model is provided
        if self.model is None:
            try:
                from agno.models.openai import OpenAIChat
            except ModuleNotFoundError as e:
                log_exception(e)
                log_error(
                    "Agno agents use `openai` as the default model provider. "
                    "Please provide a `model` or install `openai`."
                )
                exit(1)

            log_info("Setting default model to OpenAI Chat")
            self.model = OpenAIChat(id="gpt-4o")

    def set_defaults(self) -> None:
        if self.add_memory_references is None:
            self.add_memory_references = self.enable_user_memories or self.enable_agentic_memory

        if self.add_session_summary_references is None:
            self.add_session_summary_references = self.enable_session_summaries

        if self.num_history_responses is not None:
            self.num_history_runs = self.num_history_responses

    def reset_session(self) -> None:
        self.session_state = None
        self.session_name = None
        self.session_metrics = None
        self.images = None
        self.videos = None
        self.audio = None
        self.files = None
        self.agent_session = None

    def reset_run_state(self) -> None:
        self.run_id = None
        self.run_input = None
        self.run_messages = None
        self.run_response = None

    def initialize_agent(self) -> None:
        self.set_defaults()
        self.set_default_model()
        self.set_storage_mode()
        self.set_debug()
        self.set_agent_id()

        log_debug(f"Agent ID: {self.agent_id}", center=True)

        if self.memory is None:
            self.memory = Memory()
        elif not self._memory_deepcopy_done:
            from copy import deepcopy

            # We store a copy of memory to ensure different team instances reference unique memory copy
            if isinstance(self.memory, Memory) and self.team_id is not None:
                self.memory = deepcopy(self.memory)
            self._memory_deepcopy_done = True

        # Default to the agent's model if no model is provided
        if isinstance(self.memory, Memory):
            if self.memory.model is None and self.model is not None:
                self.memory.set_model(self.model)

        if self._formatter is None:
            self._formatter = SafeFormatter()

    @property
    def has_team(self) -> bool:
        return self.team is not None and len(self.team) > 0

    @property
    def is_paused(self) -> bool:
        if self.run_response is not None and self.run_response.is_paused:
            return True
        return False

    @property
    def should_parse_structured_output(self) -> bool:
        return self.response_model is not None and self.parse_response and self.parser_model is None

    def add_tool(self, tool: Union[Toolkit, Callable, Function, Dict]):
        if not self.tools:
            self.tools = []
        self.tools.append(tool)
        self._rebuild_tools = True

    def set_tools(self, tools: List[Union[Toolkit, Callable, Function, Dict]]):
        self.tools = tools
        self._rebuild_tools = True

    def _initialize_session_state(self, user_id: Optional[str] = None, session_id: Optional[str] = None) -> None:
        self.session_state = self.session_state or {}
        if user_id is not None:
            self.session_state["current_user_id"] = user_id
            if self.team_session_state is not None:
                self.team_session_state["current_user_id"] = user_id
            if self.workflow_session_state is not None:
                self.workflow_session_state["current_user_id"] = user_id
        if session_id is not None:
            self.session_state["current_session_id"] = session_id
            if self.team_session_state is not None:
                self.team_session_state["current_session_id"] = session_id
            if self.workflow_session_state is not None:
                self.workflow_session_state["current_user_id"] = user_id

    def _initialize_session(
        self,
        session_id: Optional[str] = None,
        user_id: Optional[str] = None,
        session_state: Optional[Dict[str, Any]] = None,
    ) -> Tuple[str, Optional[str]]:
        """Initialize the session for the agent."""

        self.reset_run_state()

        # Determine the session_id
        if session_id is not None and session_id != "":
            # Reset session state if a session_id is provided. Session name and session state will be loaded from storage.
            # Only reset session state if the session_id is different from the current session_id
            if self.session_id is not None and session_id != self.session_id:
                self.reset_session()

            self.session_id = session_id
        else:
            if not (self.session_id is None or self.session_id == ""):
                session_id = self.session_id
            else:
                # Generate a new session_id and store it in the agent
                self.session_id = session_id = str(uuid4())

        # Use the default user_id when necessary
        if user_id is None or user_id == "":
            user_id = self.user_id

        # Determine the session_state
        if session_state is not None:
            self.session_state = session_state

        self._initialize_session_state(user_id=user_id, session_id=session_id)

        return session_id, user_id

    def _run(
        self,
        run_response: RunResponse,
        run_messages: RunMessages,
        session_id: str,
        user_id: Optional[str] = None,
        response_format: Optional[Union[Dict, Type[BaseModel]]] = None,
        refresh_session_before_write: Optional[bool] = False,
    ) -> RunResponse:
        """Run the Agent and return the RunResponse.

        Steps:
        1. Reason about the task if reasoning is enabled
        2. Generate a response from the Model (includes running function calls)
        3. Add the run to memory
        4. Update Agent Memory
        5. Calculate session metrics
        6. Save session to storage
        7. Save output to file if save_response_to_file is set
        """
        log_debug(f"Agent Run Start: {run_response.run_id}", center=True)

        # 1. Reason about the task
        self._handle_reasoning(run_messages=run_messages)

        # Get the index of the last "user" message in messages_for_run
        # We track this so we can add messages after this index to the RunResponse and Memory
        index_of_last_user_message = len(run_messages.messages)

        # 2. Generate a response from the Model (includes running function calls)
        self.model = cast(Model, self.model)
        model_response: ModelResponse = self.model.response(
            messages=run_messages.messages,
            tools=self._tools_for_model,
            functions=self._functions_for_model,
            tool_choice=self.tool_choice,
            tool_call_limit=self.tool_call_limit,
            response_format=response_format,
        )
        # If an output model is provided, generate output using the output model
        self._generate_response_with_output_model(model_response, run_messages)

        # If a parser model is provided, structure the response separately
        self._parse_response_with_parser_model(model_response, run_messages)

        self._update_run_response(model_response=model_response, run_response=run_response, run_messages=run_messages)

        # 3. Add the run to memory
        self._add_run_to_memory(
            run_response=run_response,
            run_messages=run_messages,
            session_id=session_id,
            index_of_last_user_message=index_of_last_user_message,
        )

        # We should break out of the run function
        if any(tool_call.is_paused for tool_call in run_response.tools or []):
            return self._handle_agent_run_paused(
                run_response=run_response, run_messages=run_messages, session_id=session_id, user_id=user_id
            )

        # 4. Update Agent Memory
        response_iterator = self._update_memory(
            run_messages=run_messages,
            session_id=session_id,
            user_id=user_id,
        )
        # Consume the response iterator to ensure the memory is updated before the run is completed
        deque(response_iterator, maxlen=0)

        # 5. Calculate session metrics
        self._set_session_metrics(run_messages)

        # Convert the response to the structured format if needed
        self._convert_response_to_structured_format(run_response)

        # 6. Save session to storage
        self.write_to_storage(user_id=user_id, session_id=session_id, refresh_session=refresh_session_before_write)

        # 7. Save output to file if save_response_to_file is set
        self.save_run_response_to_file(message=run_messages.user_message, session_id=session_id)

        # Log Agent Run
        self._log_agent_run(user_id=user_id, session_id=session_id)

        log_debug(f"Agent Run End: {run_response.run_id}", center=True, symbol="*")

        return run_response

    def _run_stream(
        self,
        run_response: RunResponse,
        run_messages: RunMessages,
        session_id: str,
        user_id: Optional[str] = None,
        response_format: Optional[Union[Dict, Type[BaseModel]]] = None,
        stream_intermediate_steps: bool = False,
        refresh_session_before_write: Optional[bool] = False,
    ) -> Iterator[RunResponseEvent]:
        """Run the Agent and yield the RunResponse.

        Steps:
        1. Reason about the task if reasoning is enabled
        2. Generate a response from the Model (includes running function calls)
        3. Add the run to memory
        4. Update Agent Memory
        5. Calculate session metrics
        6. Save output to file if save_response_to_file is set
        7. Save session to storage
        """
        log_debug(f"Agent Run Start: {run_response.run_id}", center=True)

        # Start the Run by yielding a RunStarted event
        if stream_intermediate_steps:
            yield self._handle_event(create_run_response_started_event(run_response), run_response)

        # 1. Reason about the task if reasoning is enabled
        yield from self._handle_reasoning_stream(run_messages=run_messages)

        # Get the index of the last "user" message in messages_for_run
        # We track this, so we can add messages after this index to the RunResponse and Memory
        index_of_last_user_message = len(run_messages.messages)

        # 2. Process model response
        if self.output_model is None:
            for event in self._handle_model_response_stream(
                run_response=run_response,
                run_messages=run_messages,
                response_format=response_format,
                stream_intermediate_steps=stream_intermediate_steps,
            ):
                yield event
        else:
            from agno.run.response import IntermediateRunResponseContentEvent, RunResponseContentEvent

            for event in self._handle_model_response_stream(
                run_response=run_response,
                run_messages=run_messages,
                response_format=response_format,
                stream_intermediate_steps=stream_intermediate_steps,
            ):
                if isinstance(event, RunResponseContentEvent):
                    if stream_intermediate_steps:
                        yield IntermediateRunResponseContentEvent(
                            content=event.content,
                            content_type=event.content_type,
                        )
                else:
                    yield event

            # If an output model is provided, generate output using the output model
            yield from self._generate_response_with_output_model_stream(
                run_response=run_response, run_messages=run_messages
            )

        # If a parser model is provided, structure the response separately
        yield from self._parse_response_with_parser_model_stream(
            run_response=run_response, stream_intermediate_steps=stream_intermediate_steps
        )

        # 3. Add the run to memory
        self._add_run_to_memory(
            run_response=run_response,
            run_messages=run_messages,
            session_id=session_id,
            index_of_last_user_message=index_of_last_user_message,
        )

        # We should break out of the run function
        if any(tool_call.is_paused for tool_call in run_response.tools or []):
            yield from self._handle_agent_run_paused_stream(
                run_response=run_response, run_messages=run_messages, session_id=session_id, user_id=user_id
            )
            return

        # 4. Update Agent Memory
        yield from self._update_memory(
            run_messages=run_messages,
            session_id=session_id,
            user_id=user_id,
            stream_intermediate_steps=stream_intermediate_steps,
        )

        # 5. Calculate session metrics
        self._set_session_metrics(run_messages)

        # 6. Save output to file if save_response_to_file is set
        self.save_run_response_to_file(message=run_messages.user_message, session_id=session_id)

        if stream_intermediate_steps:
            yield self._handle_event(create_run_response_completed_event(from_run_response=run_response), run_response)

        # 7. Save session to storage
        self.write_to_storage(user_id=user_id, session_id=session_id, refresh_session=refresh_session_before_write)

        # Log Agent Run
        self._log_agent_run(user_id=user_id, session_id=session_id)

        log_debug(f"Agent Run End: {run_response.run_id}", center=True, symbol="*")

    @overload
    def run(
        self,
        message: Optional[Union[str, List, Dict, Message, BaseModel]] = None,
        *,
        stream: Literal[False] = False,
        stream_intermediate_steps: Optional[bool] = None,
        user_id: Optional[str] = None,
        session_id: Optional[str] = None,
        session_state: Optional[Dict[str, Any]] = None,
        audio: Optional[Sequence[Audio]] = None,
        images: Optional[Sequence[Image]] = None,
        videos: Optional[Sequence[Video]] = None,
        files: Optional[Sequence[File]] = None,
        messages: Optional[Sequence[Union[Dict, Message]]] = None,
        retries: Optional[int] = None,
        knowledge_filters: Optional[Dict[str, Any]] = None,
        refresh_session_before_write: Optional[bool] = False,
        **kwargs: Any,
    ) -> RunResponse: ...

    @overload
    def run(
        self,
        message: Optional[Union[str, List, Dict, Message, BaseModel]] = None,
        *,
        stream: Literal[True] = True,
        stream_intermediate_steps: Optional[bool] = None,
        user_id: Optional[str] = None,
        session_id: Optional[str] = None,
        session_state: Optional[Dict[str, Any]] = None,
        audio: Optional[Sequence[Audio]] = None,
        images: Optional[Sequence[Image]] = None,
        videos: Optional[Sequence[Video]] = None,
        files: Optional[Sequence[File]] = None,
        messages: Optional[Sequence[Union[Dict, Message]]] = None,
        retries: Optional[int] = None,
        knowledge_filters: Optional[Dict[str, Any]] = None,
        refresh_session_before_write: Optional[bool] = False,
        **kwargs: Any,
    ) -> Iterator[RunResponseEvent]: ...

    def run(
        self,
        message: Optional[Union[str, List, Dict, Message, BaseModel]] = None,
        *,
        stream: Optional[bool] = None,
        stream_intermediate_steps: Optional[bool] = None,
        user_id: Optional[str] = None,
        session_id: Optional[str] = None,
        session_state: Optional[Dict[str, Any]] = None,
        audio: Optional[Sequence[Audio]] = None,
        images: Optional[Sequence[Image]] = None,
        videos: Optional[Sequence[Video]] = None,
        files: Optional[Sequence[File]] = None,
        messages: Optional[Sequence[Union[Dict, Message]]] = None,
        retries: Optional[int] = None,
        knowledge_filters: Optional[Dict[str, Any]] = None,
        refresh_session_before_write: Optional[bool] = False,
        **kwargs: Any,
    ) -> Union[RunResponse, Iterator[RunResponseEvent]]:
        """Run the Agent and return the response."""
        session_id, user_id = self._initialize_session(
            session_id=session_id, user_id=user_id, session_state=session_state
        )

        # Initialize the Agent
        self.initialize_agent()

        # Read existing session from storage
        self.read_from_storage(session_id=session_id)

        log_debug(f"Session ID: {session_id}", center=True)

        # Initialize Knowledge Filters
        effective_filters = knowledge_filters

        # When filters are passed manually
        if self.knowledge_filters or knowledge_filters:
            """
                initialize metadata (specially required in case when load is commented out)
                when load is not called the reader's document_lists won't be called and metadata filters won't be initialized
                so we need to call initialize_valid_filters to make sure the filters are initialized
            """
            if not self.knowledge.valid_metadata_filters:  # type: ignore
                self.knowledge.initialize_valid_filters()  # type: ignore

            effective_filters = self._get_effective_filters(knowledge_filters)

        # Agentic filters are enabled
        if self.enable_agentic_knowledge_filters and not self.knowledge.valid_metadata_filters:  # type: ignore
            # initialize metadata (specially required in case when load is commented out)
            self.knowledge.initialize_valid_filters()  # type: ignore

        # Use stream override value when necessary
        if stream is None:
            stream = False if self.stream is None else self.stream

        if stream_intermediate_steps is None:
            stream_intermediate_steps = (
                False if self.stream_intermediate_steps is None else self.stream_intermediate_steps
            )

        # Can't have stream_intermediate_steps if stream is False
        if stream is False:
            stream_intermediate_steps = False

        self.stream = self.stream or stream
        self.stream_intermediate_steps = self.stream_intermediate_steps or (stream_intermediate_steps and self.stream)

        # Read existing session from storage
        if self.context is not None:
            self.resolve_run_context()

        # Prepare arguments for the model
        self.set_default_model()
        response_format = self._get_response_format() if self.parser_model is None else None
        self.model = cast(Model, self.model)

        self.determine_tools_for_model(
            model=self.model,
            session_id=session_id,
            user_id=user_id,
            async_mode=False,
            knowledge_filters=effective_filters,
        )

        # Create a run_id for this specific run
        run_id = str(uuid4())

        # Create a new run_response for this attempt
        run_response = RunResponse(
            run_id=run_id,
            session_id=session_id,
            agent_id=self.agent_id,
            agent_name=self.name,
            team_session_id=self.team_session_id,
        )

        run_response.model = self.model.id if self.model is not None else None
        run_response.model_provider = self.model.provider if self.model is not None else None

        self.run_response = run_response
        self.run_id = run_id

        # If no retries are set, use the agent's default retries
        retries = retries if retries is not None else self.retries

        last_exception = None
        num_attempts = retries + 1

        for attempt in range(num_attempts):
            try:
                # Set run_input
                if message is not None:
                    if isinstance(message, str):
                        self.run_input = message
                    elif isinstance(message, Message):
                        self.run_input = message.to_dict()
                    else:
                        self.run_input = message
                elif messages is not None:
                    self.run_input = [m.to_dict() if isinstance(m, Message) else m for m in messages]

                # Prepare run messages
                run_messages: RunMessages = self.get_run_messages(
                    message=message,
                    session_id=session_id,
                    user_id=user_id,
                    audio=audio,
                    images=images,
                    videos=videos,
                    files=files,
                    messages=messages,
                    knowledge_filters=effective_filters,
                    **kwargs,
                )
                if len(run_messages.messages) == 0:
                    log_error("No messages to be sent to the model.")

                self.run_messages = run_messages

                if stream:
                    response_iterator = self._run_stream(
                        run_response=run_response,
                        run_messages=run_messages,
                        user_id=user_id,
                        session_id=session_id,
                        response_format=response_format,
                        stream_intermediate_steps=stream_intermediate_steps,
                        refresh_session_before_write=refresh_session_before_write,
                    )
                    return response_iterator
                else:
                    response = self._run(
                        run_response=run_response,
                        run_messages=run_messages,
                        user_id=user_id,
                        session_id=session_id,
                        response_format=response_format,
                        refresh_session_before_write=refresh_session_before_write,
                    )
                    return response
            except ModelProviderError as e:
                log_warning(f"Attempt {attempt + 1}/{num_attempts} failed: {str(e)}")
                if isinstance(e, StopAgentRun):
                    raise e
                last_exception = e
                if attempt < num_attempts - 1:  # Don't sleep on the last attempt
                    if self.exponential_backoff:
                        delay = 2**attempt * self.delay_between_retries
                    else:
                        delay = self.delay_between_retries
                    import time

                    time.sleep(delay)
            except KeyboardInterrupt:
                self.run_response = self.create_run_response(
                    run_state=RunStatus.cancelled, content="Operation cancelled by user", run_response=run_response
                )
                if stream:
                    return generator_wrapper(  # type: ignore
                        create_run_response_cancelled_event(run_response, "Operation cancelled by user")
                    )
                else:
                    return self.run_response

        # If we get here, all retries failed
        if last_exception is not None:
            log_error(
                f"Failed after {num_attempts} attempts. Last error using {last_exception.model_name}({last_exception.model_id})"
            )
            if stream:
                return generator_wrapper(create_run_response_error_event(run_response, error=str(last_exception)))  # type: ignore

            raise last_exception
        else:
            if stream:
                return generator_wrapper(create_run_response_error_event(run_response, error=str(last_exception)))  # type: ignore
            raise Exception(f"Failed after {num_attempts} attempts.")

    async def _arun(
        self,
        run_response: RunResponse,
        run_messages: RunMessages,
        session_id: str,
        user_id: Optional[str] = None,
        response_format: Optional[Union[Dict, Type[BaseModel]]] = None,
        refresh_session_before_write: Optional[bool] = False,
    ) -> RunResponse:
        """Run the Agent and yield the RunResponse.

        Steps:
        1. Reason about the task if reasoning is enabled
        2. Generate a response from the Model (includes running function calls)
        3. Add the run to memory
        4. Update Agent Memory
        5. Calculate session metrics
        6. Save session to storage
        7. Save output to file if save_response_to_file is set
        """
        log_debug(f"Agent Run Start: {run_response.run_id}", center=True)

        self.model = cast(Model, self.model)
        # 1. Reason about the task if reasoning is enabled
        await self._ahandle_reasoning(run_messages=run_messages)

        # Get the index of the last "user" message in messages_for_run
        # We track this so we can add messages after this index to the RunResponse and Memory
        index_of_last_user_message = len(run_messages.messages)

        # 2. Generate a response from the Model (includes running function calls)
        model_response: ModelResponse = await self.model.aresponse(
            messages=run_messages.messages,
            tools=self._tools_for_model,
            functions=self._functions_for_model,
            tool_choice=self.tool_choice,
            tool_call_limit=self.tool_call_limit,
            response_format=response_format,
        )

        # If an output model is provided, generate output using the output model
        await self._agenerate_response_with_output_model(model_response=model_response, run_messages=run_messages)

        # If a parser model is provided, structure the response separately
        await self._aparse_response_with_parser_model(model_response=model_response, run_messages=run_messages)

        self._update_run_response(model_response=model_response, run_response=run_response, run_messages=run_messages)

        # 3. Add the run to memory
        self._add_run_to_memory(
            run_response=run_response,
            run_messages=run_messages,
            session_id=session_id,
            index_of_last_user_message=index_of_last_user_message,
        )

        # We should break out of the run function
        if any(tool_call.is_paused for tool_call in run_response.tools or []):
            return self._handle_agent_run_paused(
                run_response=run_response, run_messages=run_messages, session_id=session_id, user_id=user_id
            )

        # 4. Update Agent Memory
        async for _ in self._aupdate_memory(
            run_messages=run_messages,
            session_id=session_id,
            user_id=user_id,
        ):
            pass

        # 5. Calculate session metrics
        self._set_session_metrics(run_messages)

        # Convert the response to the structured format if needed
        self._convert_response_to_structured_format(run_response)

        # 6. Save session to storage
        self.write_to_storage(user_id=user_id, session_id=session_id, refresh_session=refresh_session_before_write)

        # 7. Save output to file if save_response_to_file is set
        self.save_run_response_to_file(message=run_messages.user_message, session_id=session_id)

        # Log Agent Run
        await self._alog_agent_run(user_id=user_id, session_id=session_id)

        log_debug(f"Agent Run End: {run_response.run_id}", center=True, symbol="*")

        return run_response

    async def _arun_stream(
        self,
        run_response: RunResponse,
        run_messages: RunMessages,
        session_id: str,
        user_id: Optional[str] = None,
        response_format: Optional[Union[Dict, Type[BaseModel]]] = None,
        stream_intermediate_steps: bool = False,
        refresh_session_before_write: Optional[bool] = False,
    ) -> AsyncIterator[RunResponseEvent]:
        """Run the Agent and yield the RunResponse.

        Steps:
        1. Reason about the task if reasoning is enabled
        2. Generate a response from the Model
        3. Add the run to memory
        4. Update Agent Memory
        5. Calculate session metrics
        6. Save output to file if save_response_to_file is set
        7. Save session to storage
        """
        log_debug(f"Agent Run Start: {run_response.run_id}", center=True)
        # Start the Run by yielding a RunStarted event
        if stream_intermediate_steps:
            yield self._handle_event(create_run_response_started_event(run_response), run_response)

        # 1. Reason about the task if reasoning is enabled
        async for item in self._ahandle_reasoning_stream(run_messages=run_messages):
            yield item

        # Get the index of the last "user" message in messages_for_run
        # We track this so we can add messages after this index to the RunResponse and Memory
        index_of_last_user_message = len(run_messages.messages)

        # 2. Generate a response from the Model
        if self.output_model is None:
            async for event in self._ahandle_model_response_stream(
                run_response=run_response,
                run_messages=run_messages,
                response_format=response_format,
                stream_intermediate_steps=stream_intermediate_steps,
            ):
                yield event
        else:
            from agno.run.response import IntermediateRunResponseContentEvent, RunResponseContentEvent

            async for event in self._ahandle_model_response_stream(
                run_response=run_response,
                run_messages=run_messages,
                response_format=response_format,
                stream_intermediate_steps=stream_intermediate_steps,
            ):
                if isinstance(event, RunResponseContentEvent):
                    if stream_intermediate_steps:
                        yield IntermediateRunResponseContentEvent(
                            content=event.content,
                            content_type=event.content_type,
                        )
                else:
                    yield event

            # If an output model is provided, generate output using the output model
            async for event in self._agenerate_response_with_output_model_stream(
                run_response=run_response,
                run_messages=run_messages,
                stream_intermediate_steps=stream_intermediate_steps,
            ):
                yield event

        # If a parser model is provided, structure the response separately
        async for event in self._aparse_response_with_parser_model_stream(
            run_response=run_response, stream_intermediate_steps=stream_intermediate_steps
        ):
            yield event

        # 3. Add the run to memory
        self._add_run_to_memory(
            run_response=run_response,
            run_messages=run_messages,
            session_id=session_id,
            index_of_last_user_message=index_of_last_user_message,
        )

        # We should break out of the run function
        if any(tool_call.is_paused for tool_call in run_response.tools or []):
            for item in self._handle_agent_run_paused_stream(
                run_response=run_response, run_messages=run_messages, session_id=session_id, user_id=user_id
            ):
                yield item
            return

        # 4. Update Agent Memory
        async for event in self._aupdate_memory(
            run_messages=run_messages,
            session_id=session_id,
            user_id=user_id,
            stream_intermediate_steps=stream_intermediate_steps,
        ):
            yield event

        # 5. Calculate session metrics
        self._set_session_metrics(run_messages)

        # 6. Save output to file if save_response_to_file is set
        self.save_run_response_to_file(message=run_messages.user_message, session_id=session_id)

        if stream_intermediate_steps:
            yield self._handle_event(create_run_response_completed_event(from_run_response=run_response), run_response)

        # 7. Save session to storage
        self.write_to_storage(user_id=user_id, session_id=session_id, refresh_session=refresh_session_before_write)

        # Log Agent Run
        await self._alog_agent_run(user_id=user_id, session_id=session_id)

        log_debug(f"Agent Run End: {run_response.run_id}", center=True, symbol="*")

    async def arun(
        self,
        message: Optional[Union[str, List, Dict, Message, BaseModel]] = None,
        *,
        stream: Optional[bool] = None,
        user_id: Optional[str] = None,
        session_id: Optional[str] = None,
        session_state: Optional[Dict[str, Any]] = None,
        audio: Optional[Sequence[Audio]] = None,
        images: Optional[Sequence[Image]] = None,
        videos: Optional[Sequence[Video]] = None,
        files: Optional[Sequence[File]] = None,
        messages: Optional[Sequence[Union[Dict, Message]]] = None,
        stream_intermediate_steps: Optional[bool] = None,
        retries: Optional[int] = None,
        knowledge_filters: Optional[Dict[str, Any]] = None,
        refresh_session_before_write: Optional[bool] = False,
        **kwargs: Any,
    ) -> Any:
        """Async Run the Agent and return the response."""

        session_id, user_id = self._initialize_session(
            session_id=session_id, user_id=user_id, session_state=session_state
        )

        log_debug(f"Session ID: {session_id}", center=True)

        # Initialize the Agent
        self.initialize_agent()

        # Read existing session from storage
        self.read_from_storage(session_id=session_id)

        effective_filters = knowledge_filters
        # When filters are passed manually
        if self.knowledge_filters or knowledge_filters:
            """
                initialize metadata (specially required in case when load is commented out)
                when load is not called the reader's document_lists won't be called and metadata filters won't be initialized
                so we need to call initialize_valid_filters to make sure the filters are initialized
            """
            if not self.knowledge.valid_metadata_filters:  # type: ignore
                self.knowledge.initialize_valid_filters()  # type: ignore

            effective_filters = self._get_effective_filters(knowledge_filters)

        # Agentic filters are enabled
        if self.enable_agentic_knowledge_filters and not self.knowledge.valid_metadata_filters:  # type: ignore
            # initialize metadata (specially required in case when load is commented out)
            self.knowledge.initialize_valid_filters()  # type: ignore

        # Use stream override value when necessary
        if stream is None:
            stream = False if self.stream is None else self.stream

        if stream_intermediate_steps is None:
            stream_intermediate_steps = (
                False if self.stream_intermediate_steps is None else self.stream_intermediate_steps
            )

        # Can't have stream_intermediate_steps if stream is False
        if stream is False:
            stream_intermediate_steps = False

        self.stream = self.stream or stream
        self.stream_intermediate_steps = self.stream_intermediate_steps or (stream_intermediate_steps and self.stream)

        # Read existing session from storage
        if self.context is not None:
            await self.aresolve_run_context()

        # Prepare arguments for the model
        self.set_default_model()
        response_format = self._get_response_format() if self.parser_model is None else None
        self.model = cast(Model, self.model)

        self.determine_tools_for_model(
            model=self.model,
            session_id=session_id,
            user_id=user_id,
            async_mode=True,
            knowledge_filters=effective_filters,
        )

        # Create a run_id for this specific run
        run_id = str(uuid4())

        # Create a new run_response for this attempt
        run_response = RunResponse(
            run_id=run_id,
            session_id=session_id,
            agent_id=self.agent_id,
            agent_name=self.name,
            team_session_id=self.team_session_id,
        )

        run_response.model = self.model.id if self.model is not None else None
        run_response.model_provider = self.model.provider if self.model is not None else None

        self.run_response = run_response
        self.run_id = run_id

        # If no retries are set, use the agent's default retries
        retries = retries if retries is not None else self.retries

        last_exception = None
        num_attempts = retries + 1

        for attempt in range(num_attempts):
            try:
                # Set run_input
                if message is not None:
                    if isinstance(message, str):
                        self.run_input = message
                    elif isinstance(message, Message):
                        self.run_input = message.to_dict()
                    else:
                        self.run_input = message
                elif messages is not None:
                    self.run_input = [m.to_dict() if isinstance(m, Message) else m for m in messages]

                # Prepare run messages
                run_messages: RunMessages = self.get_run_messages(
                    message=message,
                    session_id=session_id,
                    user_id=user_id,
                    audio=audio,
                    images=images,
                    videos=videos,
                    files=files,
                    messages=messages,
                    knowledge_filters=effective_filters,
                    **kwargs,
                )
                if len(run_messages.messages) == 0:
                    log_error("No messages to be sent to the model.")

                self.run_messages = run_messages

                # Pass the new run_response to _arun
                if stream:
                    response_iterator = self._arun_stream(
                        run_response=run_response,
                        run_messages=run_messages,
                        user_id=user_id,
                        session_id=session_id,
                        response_format=response_format,
                        stream_intermediate_steps=stream_intermediate_steps,
                        refresh_session_before_write=refresh_session_before_write,
                    )  # type: ignore[assignment]
                    return response_iterator
                else:
                    response = await self._arun(
                        run_response=run_response,
                        run_messages=run_messages,
                        user_id=user_id,
                        session_id=session_id,
                        response_format=response_format,
                        refresh_session_before_write=refresh_session_before_write,
                    )
                    return response
            except ModelProviderError as e:
                log_warning(f"Attempt {attempt + 1}/{num_attempts} failed: {str(e)}")
                if isinstance(e, StopAgentRun):
                    raise e
                last_exception = e
                if attempt < num_attempts - 1:  # Don't sleep on the last attempt
                    if self.exponential_backoff:
                        delay = 2**attempt * self.delay_between_retries
                    else:
                        delay = self.delay_between_retries
                    import time

                    time.sleep(delay)
            except KeyboardInterrupt:
                self.run_response = self.create_run_response(
                    run_state=RunStatus.cancelled, content="Operation cancelled by user", run_response=run_response
                )
                if stream:
                    return async_generator_wrapper(
                        create_run_response_cancelled_event(run_response, "Operation cancelled by user")
                    )
                else:
                    return self.run_response

        # If we get here, all retries failed
        if last_exception is not None:
            log_error(
                f"Failed after {num_attempts} attempts. Last error using {last_exception.model_name}({last_exception.model_id})"
            )

            if stream:
                return async_generator_wrapper(create_run_response_error_event(run_response, error=str(last_exception)))
            raise last_exception
        else:
            if stream:
                return async_generator_wrapper(create_run_response_error_event(run_response, error=str(last_exception)))
            raise Exception(f"Failed after {num_attempts} attempts.")

    @overload
    def continue_run(
        self,
        run_response: Optional[RunResponse] = None,
        *,
        run_id: Optional[str] = None,
        updated_tools: Optional[List[ToolExecution]] = None,
        stream: Literal[False] = False,
        stream_intermediate_steps: Optional[bool] = None,
        user_id: Optional[str] = None,
        session_id: Optional[str] = None,
        retries: Optional[int] = None,
        knowledge_filters: Optional[Dict[str, Any]] = None,
    ) -> RunResponse: ...

    @overload
    def continue_run(
        self,
        run_response: Optional[RunResponse] = None,
        *,
        run_id: Optional[str] = None,
        updated_tools: Optional[List[ToolExecution]] = None,
        stream: Literal[True] = True,
        stream_intermediate_steps: Optional[bool] = None,
        user_id: Optional[str] = None,
        session_id: Optional[str] = None,
        retries: Optional[int] = None,
        knowledge_filters: Optional[Dict[str, Any]] = None,
    ) -> Iterator[RunResponseEvent]: ...

    def continue_run(
        self,
        run_response: Optional[RunResponse] = None,
        *,
        run_id: Optional[str] = None,
        updated_tools: Optional[List[ToolExecution]] = None,
        stream: Optional[bool] = None,
        stream_intermediate_steps: Optional[bool] = None,
        user_id: Optional[str] = None,
        session_id: Optional[str] = None,
        retries: Optional[int] = None,
        knowledge_filters: Optional[Dict[str, Any]] = None,
    ) -> Union[RunResponse, Iterator[RunResponseEvent]]:
        """Continue a previous run.

        Args:
            run_response: The run response to continue.
            run_id: The run id to continue. Alternative to passing run_response.
            updated_tools: The updated tools to use for the run. Required to be used with `run_id`.
            stream: Whether to stream the response.
            stream_intermediate_steps: Whether to stream the intermediate steps.
            user_id: The user id to continue the run for.
            session_id: The session id to continue the run for.
            retries: The number of retries to continue the run for.
            knowledge_filters: The knowledge filters to use for the run.
        """
        # Initialize the Agent
        self.initialize_agent()

        if session_id is not None:
            self.reset_run_state()
            # Reset session state if a session_id is provided. Session name and session state will be loaded from storage.
            self.reset_session()
            # Only reset session state if the session_id is different from the current session_id
            if self.session_id is not None and session_id != self.session_id:
                self.session_state = None

        # Initialize Session
        # Use the default user_id and session_id when necessary
        user_id = user_id if user_id is not None else self.user_id

        if session_id is None or session_id == "":
            if not (self.session_id is None or self.session_id == ""):
                session_id = self.session_id
            else:
                # Generate a new session_id and store it in the agent
                session_id = str(uuid4())
                self.session_id = session_id
        else:
            self.session_id = session_id

        self._initialize_session_state(user_id=user_id, session_id=session_id)

        log_debug(f"Session ID: {session_id}", center=True)

        effective_filters = knowledge_filters

        # When filters are passed manually
        if self.knowledge_filters or knowledge_filters:
            """
                initialize metadata (specially required in case when load is commented out)
                when load is not called the reader's document_lists won't be called and metadata filters won't be initialized
                so we need to call initialize_valid_filters to make sure the filters are initialized
            """
            if not self.knowledge.valid_metadata_filters:  # type: ignore
                self.knowledge.initialize_valid_filters()  # type: ignore

            effective_filters = self._get_effective_filters(knowledge_filters)

        # Agentic filters are enabled
        if self.enable_agentic_knowledge_filters and not self.knowledge.valid_metadata_filters:  # type: ignore
            # initialize metadata (specially required in case when load is commented out)
            self.knowledge.initialize_valid_filters()  # type: ignore

        # If no retries are set, use the agent's default retries
        retries = retries if retries is not None else self.retries

        # Use stream override value when necessary
        if stream is None:
            stream = False if self.stream is None else self.stream

        if stream_intermediate_steps is None:
            stream_intermediate_steps = (
                False if self.stream_intermediate_steps is None else self.stream_intermediate_steps
            )

        # Can't have stream_intermediate_steps if stream is False
        if stream is False:
            stream_intermediate_steps = False

        self.stream = self.stream or stream
        self.stream_intermediate_steps = self.stream_intermediate_steps or (stream_intermediate_steps and self.stream)

        # Read existing session from storage
        self.read_from_storage(session_id=session_id)

        # Run can be continued from previous run response or from passed run_response context
        if run_response is not None:
            # The run is continued from a provided run_response. This contains the updated tools.
            messages = run_response.messages or []
            self.run_response = run_response
            self.run_id = run_response.run_id
        elif run_id is not None:
            # The run is continued from a run_id. This requires the updated tools to be passed.
            if updated_tools is None:
                raise ValueError("Updated tools are required to continue a run from a run_id.")

            if isinstance(self.memory, Memory):
                runs = self.memory.get_runs(session_id=session_id)
                run_response = next((r for r in runs if r.run_id == run_id), None)  # type: ignore
            else:
                runs = self.memory.runs  # type: ignore
                run_response = next((r for r in runs if r.response.run_id == run_id), None)  # type: ignore
            if run_response is None:
                raise RuntimeError(f"No runs found for run ID {run_id}")
            run_response.tools = updated_tools
            messages = run_response.messages or []
            self.run_response = run_response
            self.run_id = run_id
        else:
            self.run_response = cast(RunResponse, self.run_response)
            self.run_response.status = RunStatus.running
            # We are continuing from a previous run_response in state
            run_response = self.run_response
            messages = self.run_response.messages or []
            self.run_id = self.run_response.run_id

        # Read existing session from storage
        if self.context is not None:
            self.resolve_run_context()

        # Prepare arguments for the model
        self.set_default_model()
        response_format = self._get_response_format()
        self.model = cast(Model, self.model)

        self.determine_tools_for_model(
            model=self.model,
            session_id=session_id,
            user_id=user_id,
            async_mode=False,
            knowledge_filters=effective_filters,
        )

        last_exception = None
        num_attempts = retries + 1
        for attempt in range(num_attempts):
            run_response = cast(RunResponse, run_response)

            log_debug(f"Agent Run Start: {run_response.run_id}", center=True)

            # Prepare run messages
            self.run_messages = self.get_continue_run_messages(
                messages=messages,
            )

            # Set run_input
            if self.run_messages.user_message is not None:
                if isinstance(self.run_messages.user_message, str):
                    self.run_input = self.run_messages.user_message
                elif isinstance(self.run_messages.user_message, Message):
                    self.run_input = self.run_messages.user_message.to_dict()
                else:
                    self.run_input = self.run_messages.user_message

            # Reset the run state
            run_response.status = RunStatus.running

            try:
                if stream:
                    response_iterator = self._continue_run_stream(
                        run_response=run_response,
                        run_messages=self.run_messages,
                        user_id=user_id,
                        session_id=session_id,
                        response_format=response_format,
                        stream_intermediate_steps=stream_intermediate_steps,
                    )
                    return response_iterator
                else:
                    response = self._continue_run(
                        run_response=run_response,
                        run_messages=self.run_messages,
                        user_id=user_id,
                        session_id=session_id,
                        response_format=response_format,
                    )
                    return response
            except ModelProviderError as e:
                log_warning(f"Attempt {attempt + 1}/{num_attempts} failed: {str(e)}")
                if isinstance(e, StopAgentRun):
                    raise e
                last_exception = e
                if attempt < num_attempts - 1:  # Don't sleep on the last attempt
                    if self.exponential_backoff:
                        delay = 2**attempt * self.delay_between_retries
                    else:
                        delay = self.delay_between_retries
                    import time

                    time.sleep(delay)
            except KeyboardInterrupt:
                if stream:
                    return generator_wrapper(  # type: ignore
                        create_run_response_cancelled_event(run_response, "Operation cancelled by user")
                    )
                else:
                    return self.create_run_response(
                        run_state=RunStatus.cancelled, content="Operation cancelled by user", run_response=run_response
                    )

        # If we get here, all retries failed
        if last_exception is not None:
            log_error(
                f"Failed after {num_attempts} attempts. Last error using {last_exception.model_name}({last_exception.model_id})"
            )

            if stream:
                return generator_wrapper(create_run_response_error_event(run_response, error=str(last_exception)))  # type: ignore
            raise last_exception
        else:
            if stream:
                return generator_wrapper(create_run_response_error_event(run_response, error=str(last_exception)))  # type: ignore
            raise Exception(f"Failed after {num_attempts} attempts.")

    def _continue_run(
        self,
        run_response: RunResponse,
        run_messages: RunMessages,
        session_id: str,
        user_id: Optional[str] = None,
        response_format: Optional[Union[Dict, Type[BaseModel]]] = None,
    ) -> RunResponse:
        """Continue a previous run.

        Steps:
        1. Handle any updated tools
        2. Generate a response from the Model
        3. Add the run to memory
        4. Update Agent Memory
        5. Calculate session metrics
        6. Save session to storage
        7. Save output to file if save_response_to_file is set
        """
        self.model = cast(Model, self.model)

        # 1. Handle the updated tools
        self._handle_tool_call_updates(run_response=run_response, run_messages=run_messages)

        # Get the index of the last "user" message in messages_for_run
        # We track this so we can add messages after this index to the RunResponse and Memory
        index_of_last_user_message = len(run_messages.messages)

        # 2. Generate a response from the Model (includes running function calls)
        self.model = cast(Model, self.model)
        model_response: ModelResponse = self.model.response(
            messages=run_messages.messages,
            response_format=response_format,
            tools=self._tools_for_model,
            functions=self._functions_for_model,
            tool_choice=self.tool_choice,
            tool_call_limit=self.tool_call_limit,
        )

        self._update_run_response(model_response=model_response, run_response=run_response, run_messages=run_messages)

        # 3. Add the run to memory
        self._add_run_to_memory(
            run_response=run_response,
            run_messages=run_messages,
            session_id=session_id,
            index_of_last_user_message=index_of_last_user_message,
        )

        # We should break out of the run function
        if any(tool_call.is_paused for tool_call in run_response.tools or []):
            return self._handle_agent_run_paused(
                run_response=run_response, run_messages=run_messages, session_id=session_id, user_id=user_id
            )

        # 4. Update Agent Memory
        response_iterator = self._update_memory(
            run_messages=run_messages,
            session_id=session_id,
            user_id=user_id,
        )
        # Consume the response iterator to ensure the memory is updated before the run is completed
        deque(response_iterator, maxlen=0)

        # 5. Calculate session metrics
        self._set_session_metrics(run_messages)

        # Convert the response to the structured format if needed
        self._convert_response_to_structured_format(run_response)

        # 6. Save session to storage
        self.write_to_storage(user_id=user_id, session_id=session_id)

        # 7. Save output to file if save_response_to_file is set
        self.save_run_response_to_file(message=run_messages.user_message, session_id=session_id)

        # Log Agent Run
        self._log_agent_run(user_id=user_id, session_id=session_id)

        run_response.status = RunStatus.running

        return run_response

    def _continue_run_stream(
        self,
        run_response: RunResponse,
        run_messages: RunMessages,
        session_id: str,
        user_id: Optional[str] = None,
        response_format: Optional[Union[Dict, Type[BaseModel]]] = None,
        stream_intermediate_steps: bool = False,
    ) -> Iterator[RunResponseEvent]:
        """Continue a previous run.

        Steps:
        1. Handle any updated tools
        2. Generate a response from the Model
        3. Add the run to memory
        4. Update Agent Memory
        5. Calculate session metrics
        6. Save output to file if save_response_to_file is set
        7. Save session to storage
        """
        # Start the Run by yielding a RunContinued event
        if stream_intermediate_steps:
            yield self._handle_event(create_run_response_continued_event(run_response), run_response)

        # 1. Handle the updated tools
        yield from self._handle_tool_call_updates_stream(run_response=run_response, run_messages=run_messages)

        # Get the index of the last "user" message in messages_for_run
        # We track this, so we can add messages after this index to the RunResponse and Memory
        index_of_last_user_message = len(run_messages.messages)

        # 2. Process model response
        for event in self._handle_model_response_stream(
            run_response=run_response,
            run_messages=run_messages,
            response_format=response_format,
            stream_intermediate_steps=stream_intermediate_steps,
        ):
            yield event

        # 3. Add the run to memory
        self._add_run_to_memory(
            run_response=run_response,
            run_messages=run_messages,
            session_id=session_id,
            index_of_last_user_message=index_of_last_user_message,
        )

        # We should break out of the run function
        if any(tool_call.is_paused for tool_call in run_response.tools or []):
            yield from self._handle_agent_run_paused_stream(
                run_response=run_response, run_messages=run_messages, session_id=session_id, user_id=user_id
            )
            return

        # 4. Update Agent Memory
        yield from self._update_memory(
            run_messages=run_messages,
            session_id=session_id,
            user_id=user_id,
            stream_intermediate_steps=stream_intermediate_steps,
        )

        # 5. Calculate session metrics
        self._set_session_metrics(run_messages)

        # 6. Save output to file if save_response_to_file is set
        self.save_run_response_to_file(message=run_messages.user_message, session_id=session_id)

        if stream_intermediate_steps:
            yield self._handle_event(create_run_response_completed_event(run_response), run_response)

        # 7. Save session to storage
        self.write_to_storage(user_id=user_id, session_id=session_id)

        # Log Agent Run
        self._log_agent_run(user_id=user_id, session_id=session_id)

        log_debug(f"Agent Run End: {run_response.run_id}", center=True, symbol="*")

    async def acontinue_run(
        self,
        run_response: Optional[RunResponse] = None,
        *,
        run_id: Optional[str] = None,
        updated_tools: Optional[List[ToolExecution]] = None,
        stream: Optional[bool] = None,
        stream_intermediate_steps: Optional[bool] = None,
        user_id: Optional[str] = None,
        session_id: Optional[str] = None,
        retries: Optional[int] = None,
        knowledge_filters: Optional[Dict[str, Any]] = None,
    ) -> Any:
        """Continue a previous run.

        Args:
            run_response: The run response to continue.
            run_id: The run id to continue. Alternative to passing run_response.
            updated_tools: The updated tools to use for the run. Required to be used with `run_id`.
            stream: Whether to stream the response.
            stream_intermediate_steps: Whether to stream the intermediate steps.
            user_id: The user id to continue the run for.
            session_id: The session id to continue the run for.
            retries: The number of retries to continue the run for.
            knowledge_filters: The knowledge filters to use for the run.
        """
        # Initialize the Agent
        self.initialize_agent()

        if session_id is not None:
            self.reset_run_state()
            # Reset session state if a session_id is provided. Session name and session state will be loaded from storage.
            self.reset_session()
            # Only reset session state if the session_id is different from the current session_id
            if self.session_id is not None and session_id != self.session_id:
                self.session_state = None

        # Initialize Session
        # Use the default user_id and session_id when necessary
        user_id = user_id if user_id is not None else self.user_id

        if session_id is None or session_id == "":
            if not (self.session_id is None or self.session_id == ""):
                session_id = self.session_id
            else:
                # Generate a new session_id and store it in the agent
                session_id = str(uuid4())
                self.session_id = session_id
        else:
            self.session_id = session_id

        self._initialize_session_state(user_id=user_id, session_id=session_id)

        log_debug(f"Session ID: {session_id}", center=True)

        effective_filters = knowledge_filters

        # When filters are passed manually
        if self.knowledge_filters or knowledge_filters:
            """
                initialize metadata (specially required in case when load is commented out)
                when load is not called the reader's document_lists won't be called and metadata filters won't be initialized
                so we need to call initialize_valid_filters to make sure the filters are initialized
            """
            if not self.knowledge.valid_metadata_filters:  # type: ignore
                self.knowledge.initialize_valid_filters()  # type: ignore

            effective_filters = self._get_effective_filters(knowledge_filters)

        # Agentic filters are enabled
        if self.enable_agentic_knowledge_filters and not self.knowledge.valid_metadata_filters:  # type: ignore
            # initialize metadata (specially required in case when load is commented out)
            self.knowledge.initialize_valid_filters()  # type: ignore

        # If no retries are set, use the agent's default retries
        retries = retries if retries is not None else self.retries

        # Use stream override value when necessary
        if stream is None:
            stream = False if self.stream is None else self.stream

        if stream_intermediate_steps is None:
            stream_intermediate_steps = (
                False if self.stream_intermediate_steps is None else self.stream_intermediate_steps
            )

        # Can't have stream_intermediate_steps if stream is False
        if stream is False:
            stream_intermediate_steps = False

        self.stream = self.stream or stream
        self.stream_intermediate_steps = self.stream_intermediate_steps or (stream_intermediate_steps and self.stream)

        # Read existing session from storage
        self.read_from_storage(session_id=session_id)

        # Run can be continued from previous run response or from passed run_response context
        if run_response is not None:
            # The run is continued from a provided run_response. This contains the updated tools.
            messages = run_response.messages or []
            self.run_response = run_response
            self.run_id = run_response.run_id
        elif run_id is not None:
            # The run is continued from a run_id. This requires the updated tools to be passed.
            if updated_tools is None:
                raise ValueError("Updated tools are required to continue a run from a run_id.")

            if isinstance(self.memory, Memory):
                runs = self.memory.get_runs(session_id=session_id)
                run_response = next((r for r in runs if r.run_id == run_id), None)  # type: ignore
            else:
                runs = self.memory.runs  # type: ignore
                run_response = next((r for r in runs if r.response.run_id == run_id), None)  # type: ignore
            if run_response is None:
                raise RuntimeError(f"No runs found for run ID {run_id}")
            run_response.tools = updated_tools
            messages = run_response.messages or []
            self.run_response = run_response
            self.run_id = run_id
        else:
            # We are continuing from a previous run_response in state
            self.run_response = cast(RunResponse, self.run_response)
            run_response = self.run_response
            messages = self.run_response.messages or []
            self.run_id = self.run_response.run_id

        # Read existing session from storage
        if self.context is not None:
            await self.aresolve_run_context()

        # Prepare arguments for the model
        self.set_default_model()
        response_format = self._get_response_format()
        self.model = cast(Model, self.model)

        self.determine_tools_for_model(
            model=self.model,
            session_id=session_id,
            user_id=user_id,
            async_mode=True,
            knowledge_filters=effective_filters,
        )

        # Extract original user message from messages and remove from messages
        user_message = None
        for m in messages:
            if m.role == self.user_message_role:
                user_message = m
                messages.remove(m)
                break

        # Set run_input
        if user_message is not None:
            if isinstance(user_message, str):
                self.run_input = user_message
            elif isinstance(user_message, Message):
                self.run_input = user_message.to_dict()
            else:
                self.run_input = user_message
        elif messages is not None:
            self.run_input = [m.to_dict() if isinstance(m, Message) else m for m in messages]

        last_exception = None
        num_attempts = retries + 1
        for attempt in range(num_attempts):
            run_response = cast(RunResponse, run_response)

            log_debug(f"Agent Run Start: {run_response.run_id}", center=True)

            # Prepare run messages
            run_messages: RunMessages = self.get_continue_run_messages(
                messages=messages,
            )

            # Reset the run paused state
            run_response.status = RunStatus.running

            try:
                if stream:
                    response_iterator = self._acontinue_run_stream(
                        run_response=run_response,
                        run_messages=run_messages,
                        user_id=user_id,
                        session_id=session_id,
                        response_format=response_format,
                        stream_intermediate_steps=stream_intermediate_steps,
                    )
                    return response_iterator
                else:
                    response = await self._acontinue_run(
                        run_response=run_response,
                        run_messages=run_messages,
                        user_id=user_id,
                        session_id=session_id,
                        response_format=response_format,
                    )
                    return response
            except ModelProviderError as e:
                log_warning(f"Attempt {attempt + 1}/{num_attempts} failed: {str(e)}")
                if isinstance(e, StopAgentRun):
                    raise e
                last_exception = e
                if attempt < num_attempts - 1:  # Don't sleep on the last attempt
                    if self.exponential_backoff:
                        delay = 2**attempt * self.delay_between_retries
                    else:
                        delay = self.delay_between_retries
                    import time

                    time.sleep(delay)
            except KeyboardInterrupt:
                if stream:
                    return async_generator_wrapper(
                        create_run_response_cancelled_event(run_response, "Operation cancelled by user")
                    )
                else:
                    return self.create_run_response(
                        run_state=RunStatus.cancelled, content="Operation cancelled by user", run_response=run_response
                    )

        # If we get here, all retries failed
        if last_exception is not None:
            log_error(
                f"Failed after {num_attempts} attempts. Last error using {last_exception.model_name}({last_exception.model_id})"
            )
            if stream:
                return async_generator_wrapper(create_run_response_error_event(run_response, error=str(last_exception)))
            raise last_exception
        else:
            if stream:
                return async_generator_wrapper(create_run_response_error_event(run_response, error=str(last_exception)))
            raise Exception(f"Failed after {num_attempts} attempts.")

    async def _acontinue_run(
        self,
        run_response: RunResponse,
        run_messages: RunMessages,
        session_id: str,
        user_id: Optional[str] = None,
        response_format: Optional[Union[Dict, Type[BaseModel]]] = None,
    ) -> RunResponse:
        """Continue a previous run.

        Steps:
        1. Handle any updated tools
        2. Generate a response from the Model
        3. Add the run to memory
        4. Update Agent Memory
        5. Calculate session metrics
        6. Save session to storage
        7. Save output to file if save_response_to_file is set
        """

        self.model = cast(Model, self.model)

        # 1. Handle the updated tools
        await self._ahandle_tool_call_updates(run_response=run_response, run_messages=run_messages)

        # Get the index of the last "user" message in messages_for_run
        # We track this so we can add messages after this index to the RunResponse and Memory
        index_of_last_user_message = len(run_messages.messages)

        # 2. Generate a response from the Model (includes running function calls)
        model_response: ModelResponse = await self.model.aresponse(
            messages=run_messages.messages,
            response_format=response_format,
            tools=self._tools_for_model,
            functions=self._functions_for_model,
            tool_choice=self.tool_choice,
            tool_call_limit=self.tool_call_limit,
        )

        self._update_run_response(model_response=model_response, run_response=run_response, run_messages=run_messages)

        # 3. Add the run to memory
        self._add_run_to_memory(
            run_response=run_response,
            run_messages=run_messages,
            session_id=session_id,
            index_of_last_user_message=index_of_last_user_message,
        )

        # We should break out of the run function
        if any(tool_call.is_paused for tool_call in run_response.tools or []):
            return self._handle_agent_run_paused(
                run_response=run_response, run_messages=run_messages, session_id=session_id, user_id=user_id
            )

        # 4. Update Agent Memory
        async for _ in self._aupdate_memory(
            run_messages=run_messages,
            session_id=session_id,
            user_id=user_id,
        ):
            pass

        # 5. Calculate session metrics
        self._set_session_metrics(run_messages)

        # Convert the response to the structured format if needed
        self._convert_response_to_structured_format(run_response)

        # 6. Save session to storage
        self.write_to_storage(user_id=user_id, session_id=session_id)

        # 7. Save output to file if save_response_to_file is set
        self.save_run_response_to_file(message=run_messages.user_message, session_id=session_id)

        # Log Agent Run
        await self._alog_agent_run(user_id=user_id, session_id=session_id)

        log_debug(f"Agent Run End: {run_response.run_id}", center=True, symbol="*")

        run_response.status = RunStatus.running

        return run_response

    async def _acontinue_run_stream(
        self,
        run_response: RunResponse,
        run_messages: RunMessages,
        session_id: str,
        user_id: Optional[str] = None,
        response_format: Optional[Union[Dict, Type[BaseModel]]] = None,
        stream_intermediate_steps: bool = False,
    ) -> AsyncIterator[RunResponseEvent]:
        """Continue a previous run.

        Steps:
        1. Handle any updated tools
        2. Generate a response from the Model
        3. Add the run to memory
        4. Update Agent Memory
        5. Calculate session metrics
        6. Save output to file if save_response_to_file is set
        7. Save session to storage
        """
        # Start the Run by yielding a RunContinued event
        if stream_intermediate_steps:
            yield self._handle_event(create_run_response_continued_event(run_response), run_response)

        # 1. Handle the updated tools
        async for event in self._ahandle_tool_call_updates_stream(run_response=run_response, run_messages=run_messages):
            yield event

        # Get the index of the last "user" message in messages_for_run
        # We track this, so we can add messages after this index to the RunResponse and Memory
        index_of_last_user_message = len(run_messages.messages)

        # 2. Process model response
        async for event in self._ahandle_model_response_stream(
            run_response=run_response,
            run_messages=run_messages,
            response_format=response_format,
            stream_intermediate_steps=stream_intermediate_steps,
        ):
            yield event

        # 3. Add the run to memory
        self._add_run_to_memory(
            run_response=run_response,
            run_messages=run_messages,
            session_id=session_id,
            index_of_last_user_message=index_of_last_user_message,
        )

        # We should break out of the run function
        if any(tool_call.is_paused for tool_call in run_response.tools or []):
            for item in self._handle_agent_run_paused_stream(
                run_response=run_response, run_messages=run_messages, session_id=session_id, user_id=user_id
            ):
                yield item
            return

        # 4. Update Agent Memory
        async for event in self._aupdate_memory(
            run_messages=run_messages,
            session_id=session_id,
            user_id=user_id,
            stream_intermediate_steps=stream_intermediate_steps,
        ):
            yield event

        # 5. Calculate session metrics
        self._set_session_metrics(run_messages)

        # 6. Save output to file if save_response_to_file is set
        self.save_run_response_to_file(message=run_messages.user_message, session_id=session_id)

        if stream_intermediate_steps:
            yield self._handle_event(create_run_response_completed_event(run_response), run_response)

        # 7. Save session to storage
        self.write_to_storage(user_id=user_id, session_id=session_id)

        # Log Agent Run
        await self._alog_agent_run(user_id=user_id, session_id=session_id)

        log_debug(f"Agent Run End: {run_response.run_id}", center=True, symbol="*")

    def _handle_agent_run_paused(
        self,
        run_response: RunResponse,
        run_messages: RunMessages,
        session_id: str,
        user_id: Optional[str] = None,
    ) -> RunResponse:
        # Set the run response to paused

        run_response.status = RunStatus.paused
        if not run_response.content:
            run_response.content = get_paused_content(run_response)

        # Save session to storage
        self.write_to_storage(user_id=user_id, session_id=session_id)

        # Log Agent Run
        self._log_agent_run(user_id=user_id, session_id=session_id)

        log_debug(f"Agent Run Paused: {run_response.run_id}", center=True, symbol="*")

        # Save output to file if save_response_to_file is set
        self.save_run_response_to_file(message=run_messages.user_message, session_id=session_id)

        # We return and await confirmation/completion for the tools that require it
        return run_response

    def _handle_agent_run_paused_stream(
        self,
        run_response: RunResponse,
        run_messages: RunMessages,
        session_id: str,
        user_id: Optional[str] = None,
    ) -> Iterator[RunResponseEvent]:
        # Set the run response to paused

        run_response.status = RunStatus.paused
        if not run_response.content:
            run_response.content = get_paused_content(run_response)

        # Save output to file if save_response_to_file is set
        self.save_run_response_to_file(message=run_messages.user_message, session_id=session_id)

        # We return and await confirmation/completion for the tools that require it
        yield self._handle_event(
            create_run_response_paused_event(
                from_run_response=run_response,
                tools=run_response.tools,
            ),
            run_response,
        )

        # Save session to storage
        self.write_to_storage(user_id=user_id, session_id=session_id)
        # Log Agent Run
        self._log_agent_run(user_id=user_id, session_id=session_id)

        log_debug(f"Agent Run Paused: {run_response.run_id}", center=True, symbol="*")

    def _convert_response_to_structured_format(self, run_response: Union[RunResponse, ModelResponse]):
        # Convert the response to the structured format if needed
        if self.response_model is not None and not isinstance(run_response.content, self.response_model):
            if isinstance(run_response.content, str) and self.parse_response:
                try:
                    structured_output = parse_response_model_str(run_response.content, self.response_model)

                    # Update RunResponse
                    if structured_output is not None:
                        run_response.content = structured_output
                        if hasattr(run_response, "content_type"):
                            run_response.content_type = self.response_model.__name__
                    else:
                        log_warning("Failed to convert response to response_model")
                except Exception as e:
                    log_warning(f"Failed to convert response to output model: {e}")
            else:
                log_warning("Something went wrong. Run response content is not a string")

    def _handle_external_execution_update(self, run_messages: RunMessages, tool: ToolExecution):
        self.model = cast(Model, self.model)

        if tool.result is not None:
            for msg in run_messages.messages:
                # Skip if the message is already in the run_messages
                if msg.tool_call_id == tool.tool_call_id:
                    break

            run_messages.messages.append(
                Message(
                    role=self.model.tool_message_role,
                    content=tool.result,
                    tool_call_id=tool.tool_call_id,
                    tool_name=tool.tool_name,
                    tool_args=tool.tool_args,
                    tool_call_error=tool.tool_call_error,
                    stop_after_tool_call=tool.stop_after_tool_call,
                )
            )
            tool.external_execution_required = False
        else:
            raise ValueError(f"Tool {tool.tool_name} requires external execution, cannot continue run")

    def _handle_user_input_update(self, tool: ToolExecution):
        for field in tool.user_input_schema or []:
            if not tool.tool_args:
                tool.tool_args = {}
            tool.tool_args[field.name] = field.value

    def _handle_get_user_input_tool_update(self, run_messages: RunMessages, tool: ToolExecution):
        import json

        self.model = cast(Model, self.model)
        # Skipping tool without user_input_schema so that tool_call_id is not repeated
        if not hasattr(tool, "user_input_schema") or not tool.user_input_schema:
            return
        user_input_result = [
            {"name": user_input_field.name, "value": user_input_field.value}
            for user_input_field in tool.user_input_schema or []
        ]
        # Add the tool call result to the run_messages
        run_messages.messages.append(
            Message(
                role=self.model.tool_message_role,
                content=f"User inputs retrieved: {json.dumps(user_input_result)}",
                tool_call_id=tool.tool_call_id,
                tool_name=tool.tool_name,
                tool_args=tool.tool_args,
                metrics=MessageMetrics(time=0),
            )
        )

    def _run_tool(self, run_messages: RunMessages, tool: ToolExecution) -> Iterator[RunResponseEvent]:
        self.run_response = cast(RunResponse, self.run_response)
        self.model = cast(Model, self.model)
        # Execute the tool
        function_call = self.model.get_function_call_to_run_from_tool_execution(tool, self._functions_for_model)
        function_call_results: List[Message] = []

        for call_result in self.model.run_function_call(
            function_call=function_call,
            function_call_results=function_call_results,
        ):
            if isinstance(call_result, ModelResponse):
                if call_result.event == ModelResponseEvent.tool_call_started.value:
                    yield self._handle_event(
                        create_tool_call_started_event(from_run_response=self.run_response, tool=tool),
                        self.run_response,
                    )

                if call_result.event == ModelResponseEvent.tool_call_completed.value and call_result.tool_executions:
                    tool_execution = call_result.tool_executions[0]
                    tool.result = tool_execution.result
                    tool.tool_call_error = tool_execution.tool_call_error
                    yield self._handle_event(
                        create_tool_call_completed_event(
                            from_run_response=self.run_response, tool=tool, content=call_result.content
                        ),
                        self.run_response,
                    )

        if len(function_call_results) > 0:
            run_messages.messages.extend(function_call_results)

    def _reject_tool_call(self, run_messages: RunMessages, tool: ToolExecution):
        self.model = cast(Model, self.model)
        function_call = self.model.get_function_call_to_run_from_tool_execution(tool, self._functions_for_model)
        function_call.error = tool.confirmation_note or "Function call was rejected by the user"
        function_call_result = self.model.create_function_call_result(
            function_call=function_call,
            success=False,
        )
        run_messages.messages.append(function_call_result)

    async def _arun_tool(
        self,
        run_messages: RunMessages,
        tool: ToolExecution,
    ) -> AsyncIterator[RunResponseEvent]:
        self.run_response = cast(RunResponse, self.run_response)
        self.model = cast(Model, self.model)

        # Execute the tool
        function_call = self.model.get_function_call_to_run_from_tool_execution(tool, self._functions_for_model)
        function_call_results: List[Message] = []

        async for call_result in self.model.arun_function_calls(
            function_calls=[function_call],
            function_call_results=function_call_results,
            skip_pause_check=True,
        ):
            if isinstance(call_result, ModelResponse):
                if call_result.event == ModelResponseEvent.tool_call_started.value:
                    yield self._handle_event(
                        create_tool_call_started_event(from_run_response=self.run_response, tool=tool),
                        self.run_response,
                    )
                if call_result.event == ModelResponseEvent.tool_call_completed.value and call_result.tool_executions:
                    tool_execution = call_result.tool_executions[0]
                    tool.result = tool_execution.result
                    tool.tool_call_error = tool_execution.tool_call_error
                    yield self._handle_event(
                        create_tool_call_completed_event(
                            from_run_response=self.run_response, tool=tool, content=call_result.content
                        ),
                        self.run_response,
                    )
        if len(function_call_results) > 0:
            run_messages.messages.extend(function_call_results)

    def _handle_tool_call_updates(self, run_response: RunResponse, run_messages: RunMessages):
        self.model = cast(Model, self.model)
        for _t in run_response.tools or []:
            # Case 1: Handle confirmed tools and execute them
            if _t.requires_confirmation is not None and _t.requires_confirmation is True and self._functions_for_model:
                # Tool is confirmed and hasn't been run before
                if _t.confirmed is not None and _t.confirmed is True and _t.result is None:
                    # Consume the generator without yielding
                    deque(self._run_tool(run_messages, _t), maxlen=0)
                else:
                    self._reject_tool_call(run_messages, _t)
                    _t.confirmed = False
                    _t.confirmation_note = _t.confirmation_note or "Tool call was rejected"
                    _t.tool_call_error = True
                _t.requires_confirmation = False

            # Case 2: Handle external execution required tools
            elif _t.external_execution_required is not None and _t.external_execution_required is True:
                self._handle_external_execution_update(run_messages=run_messages, tool=_t)

            # Case 3: Agentic user input required
            elif (
                _t.tool_name == "get_user_input"
                and _t.requires_user_input is not None
                and _t.requires_user_input is True
            ):
                self._handle_get_user_input_tool_update(run_messages=run_messages, tool=_t)
                _t.requires_user_input = False

            # Case 4: Handle user input required tools
            elif _t.requires_user_input is not None and _t.requires_user_input is True:
                self._handle_user_input_update(tool=_t)
                _t.requires_user_input = False
                _t.answered = True
                # Consume the generator without yielding
                deque(self._run_tool(run_messages, _t), maxlen=0)

    def _handle_tool_call_updates_stream(
        self, run_response: RunResponse, run_messages: RunMessages
    ) -> Iterator[RunResponseEvent]:
        self.model = cast(Model, self.model)
        for _t in run_response.tools or []:
            # Case 1: Handle confirmed tools and execute them
            if _t.requires_confirmation is not None and _t.requires_confirmation is True and self._functions_for_model:
                # Tool is confirmed and hasn't been run before
                if _t.confirmed is not None and _t.confirmed is True and _t.result is None:
                    yield from self._run_tool(run_messages, _t)
                else:
                    self._reject_tool_call(run_messages, _t)
                    _t.confirmed = False
                    _t.confirmation_note = _t.confirmation_note or "Tool call was rejected"
                    _t.tool_call_error = True
                _t.requires_confirmation = False

            # Case 2: Handle external execution required tools
            elif _t.external_execution_required is not None and _t.external_execution_required is True:
                self._handle_external_execution_update(run_messages=run_messages, tool=_t)

            # Case 3: Agentic user input required
            elif (
                _t.tool_name == "get_user_input"
                and _t.requires_user_input is not None
                and _t.requires_user_input is True
            ):
                self._handle_get_user_input_tool_update(run_messages=run_messages, tool=_t)
                _t.requires_user_input = False
                _t.answered = True

            # Case 4: Handle user input required tools
            elif _t.requires_user_input is not None and _t.requires_user_input is True:
                self._handle_user_input_update(tool=_t)
                yield from self._run_tool(run_messages, _t)
                _t.requires_user_input = False
                _t.answered = True

    async def _ahandle_tool_call_updates(self, run_response: RunResponse, run_messages: RunMessages):
        self.model = cast(Model, self.model)
        for _t in run_response.tools or []:
            # Case 1: Handle confirmed tools and execute them
            if _t.requires_confirmation is not None and _t.requires_confirmation is True and self._functions_for_model:
                # Tool is confirmed and hasn't been run before
                if _t.confirmed is not None and _t.confirmed is True and _t.result is None:
                    async for _ in self._arun_tool(run_messages, _t):
                        pass
                else:
                    self._reject_tool_call(run_messages, _t)
                    _t.confirmed = False
                    _t.confirmation_note = _t.confirmation_note or "Tool call was rejected"
                    _t.tool_call_error = True
                _t.requires_confirmation = False

            # Case 2: Handle external execution required tools
            elif _t.external_execution_required is not None and _t.external_execution_required is True:
                self._handle_external_execution_update(run_messages=run_messages, tool=_t)
            # Case 3: Agentic user input required
            elif (
                _t.tool_name == "get_user_input"
                and _t.requires_user_input is not None
                and _t.requires_user_input is True
            ):
                self._handle_get_user_input_tool_update(run_messages=run_messages, tool=_t)
                _t.requires_user_input = False
                _t.answered = True
            # Case 4: Handle user input required tools
            elif _t.requires_user_input is not None and _t.requires_user_input is True:
                self._handle_user_input_update(tool=_t)
                async for _ in self._arun_tool(run_messages, _t):
                    pass
                _t.requires_user_input = False
                _t.answered = True

    async def _ahandle_tool_call_updates_stream(
        self, run_response: RunResponse, run_messages: RunMessages
    ) -> AsyncIterator[RunResponseEvent]:
        self.model = cast(Model, self.model)
        for _t in run_response.tools or []:
            # Case 1: Handle confirmed tools and execute them
            if _t.requires_confirmation is not None and _t.requires_confirmation is True and self._functions_for_model:
                # Tool is confirmed and hasn't been run before
                if _t.confirmed is not None and _t.confirmed is True and _t.result is None:
                    async for event in self._arun_tool(run_messages, _t):
                        yield event
                else:
                    self._reject_tool_call(run_messages, _t)
                    _t.confirmed = False
                    _t.confirmation_note = _t.confirmation_note or "Tool call was rejected"
                    _t.tool_call_error = True
                _t.requires_confirmation = False

            # Case 2: Handle external execution required tools
            elif _t.external_execution_required is not None and _t.external_execution_required is True:
                self._handle_external_execution_update(run_messages=run_messages, tool=_t)
            # Case 3: Agentic user input required
            elif (
                _t.tool_name == "get_user_input"
                and _t.requires_user_input is not None
                and _t.requires_user_input is True
            ):
                self._handle_get_user_input_tool_update(run_messages=run_messages, tool=_t)
                _t.requires_user_input = False
                _t.answered = True
            # # Case 4: Handle user input required tools
            elif _t.requires_user_input is not None and _t.requires_user_input is True:
                self._handle_user_input_update(tool=_t)
                async for event in self._arun_tool(run_messages, _t):
                    yield event
                _t.requires_user_input = False
                _t.answered = True

    def _update_run_response(self, model_response: ModelResponse, run_response: RunResponse, run_messages: RunMessages):
        # Format tool calls if they exist
        if model_response.tool_executions:
            run_response.formatted_tool_calls = format_tool_calls(model_response.tool_executions)

        # Handle structured outputs
        if self.response_model is not None and model_response.parsed is not None:
            # We get native structured outputs from the model
            if self._model_should_return_structured_output():
                # Update the run_response content with the structured output
                run_response.content = model_response.parsed
                # Update the run_response content_type with the structured output class name
                run_response.content_type = self.response_model.__name__
        else:
            # Update the run_response content with the model response content
            run_response.content = model_response.content

        # Update the run_response thinking with the model response thinking
        if model_response.thinking is not None:
            run_response.thinking = model_response.thinking
        if model_response.redacted_thinking is not None:
            if run_response.thinking is None:
                run_response.thinking = model_response.redacted_thinking
            else:
                run_response.thinking += model_response.redacted_thinking

        # Update the run_response citations with the model response citations
        if model_response.citations is not None:
            run_response.citations = model_response.citations

        # Update the run_response tools with the model response tool_executions
        if model_response.tool_executions is not None:
            if run_response.tools is None:
                run_response.tools = model_response.tool_executions
            else:
                run_response.tools.extend(model_response.tool_executions)

            # For Reasoning/Thinking/Knowledge Tools update reasoning_content in RunResponse
            for tool_call in model_response.tool_executions:
                tool_name = tool_call.tool_name or ""
                if tool_name.lower() in ["think", "analyze"]:
                    tool_args = tool_call.tool_args or {}
                    self.update_reasoning_content_from_tool_call(tool_name, tool_args)

        # Update the run_response audio with the model response audio
        if model_response.audio is not None:
            run_response.response_audio = model_response.audio

        if model_response.image is not None:
            self.add_image(model_response.image)

        # Update the run_response created_at with the model response created_at
        run_response.created_at = model_response.created_at

        # Build a list of messages that should be added to the RunResponse
        messages_for_run_response = [m for m in run_messages.messages if m.add_to_agent_memory]
        # Update the RunResponse messages
        run_response.messages = messages_for_run_response
        # Update the RunResponse metrics
        run_response.metrics = self.aggregate_metrics_from_messages(messages_for_run_response)

    def _add_run_to_memory(
        self,
        run_response: RunResponse,
        run_messages: RunMessages,
        session_id: str,
        index_of_last_user_message: int = 0,
    ):
        if isinstance(self.memory, AgentMemory):
            self.memory = cast(AgentMemory, self.memory)
        else:
            self.memory = cast(Memory, self.memory)

        if isinstance(self.memory, AgentMemory):
            # Add the system message to the memory
            if run_messages.system_message is not None:
                self.memory.add_system_message(
                    run_messages.system_message, system_message_role=self.system_message_role
                )

            # Build a list of messages that should be added to the AgentMemory
            messages_for_memory: List[Message] = (
                [run_messages.user_message] if run_messages.user_message is not None else []
            )
            # Add messages from messages_for_run after the last user message
            for _rm in run_messages.messages[index_of_last_user_message:]:
                if _rm.add_to_agent_memory:
                    messages_for_memory.append(_rm)
            if len(messages_for_memory) > 0:
                self.memory.add_messages(messages=messages_for_memory)

            # Create an AgentRun object to add to memory
            agent_run = AgentRun(response=run_response)
            agent_run.message = run_messages.user_message

            if run_messages.extra_messages is not None and len(run_messages.extra_messages) > 0:
                for _im in run_messages.extra_messages:
                    # Parse the message and convert to a Message object if possible
                    mp = None
                    if isinstance(_im, Message):
                        mp = _im
                    elif isinstance(_im, dict):
                        try:
                            mp = Message(**_im)
                        except Exception as e:
                            log_warning(f"Failed to validate message: {e}")
                    else:
                        log_warning(f"Unsupported message type: {type(_im)}")
                        continue

                    # Add the message to the AgentRun
                    if mp:
                        if agent_run.messages is None:
                            agent_run.messages = []
                        agent_run.messages.append(mp)
                    else:
                        log_warning("Unable to add message to memory")

            # Add AgentRun to memory
            self.memory.add_run(agent_run)

        elif isinstance(self.memory, Memory):
            # Add AgentRun to memory
            self.memory.add_run(session_id=session_id, run=run_response)

    def _set_session_metrics(self, run_messages: RunMessages):
        if isinstance(self.memory, AgentMemory):
            self.memory = cast(AgentMemory, self.memory)
        else:
            self.memory = cast(Memory, self.memory)

        if isinstance(self.memory, AgentMemory):
            # Calculate session metrics
            self.session_metrics = self.calculate_metrics(self.memory.messages)
        elif isinstance(self.memory, Memory):
            # Calculate session metrics
            if self.session_metrics is None:
                self.session_metrics = self.calculate_metrics(run_messages.messages)  # Calculate metrics for the run
            else:
                self.session_metrics += self.calculate_metrics(
                    run_messages.messages
                )  # Calculate metrics for the session

    def _update_memory(
        self,
        run_messages: RunMessages,
        session_id: str,
        user_id: Optional[str] = None,
        stream_intermediate_steps: bool = False,
    ) -> Iterator[RunResponseEvent]:
        self.run_response = cast(RunResponse, self.run_response)
        if isinstance(self.memory, AgentMemory):
            self.memory = cast(AgentMemory, self.memory)
        else:
            self.memory = cast(Memory, self.memory)

        if isinstance(self.memory, AgentMemory):
            # Update the memories with the user message if needed
            if (
                self.memory.create_user_memories
                and self.memory.update_user_memories_after_run
                and run_messages.user_message is not None
            ):
                if stream_intermediate_steps:
                    yield self._handle_event(
                        create_memory_update_started_event(from_run_response=self.run_response), self.run_response
                    )

                self.memory.update_memory(input=run_messages.user_message.get_content_string())

                if stream_intermediate_steps:
                    yield self._handle_event(
                        create_memory_update_completed_event(from_run_response=self.run_response), self.run_response
                    )

            if run_messages.extra_messages is not None and len(run_messages.extra_messages) > 0:
                for _im in run_messages.extra_messages:
                    # Parse the message and convert to a Message object if possible
                    mp = None
                    if isinstance(_im, Message):
                        mp = _im
                    elif isinstance(_im, dict):
                        try:
                            mp = Message(**_im)
                        except Exception as e:
                            log_warning(f"Failed to validate message during memory update: {e}")
                    else:
                        log_warning(f"Unsupported message type during memory update: {type(_im)}")
                        continue

                    # Add the message to the AgentRun
                    if mp:
                        if self.memory.create_user_memories and self.memory.update_user_memories_after_run:
                            self.memory.update_memory(input=mp.get_content_string())
                    else:
                        log_warning("Unable to add message to memory")

            # Update the session summary if needed
            if self.memory.create_session_summary and self.memory.update_session_summary_after_run:
                self.memory.update_summary()

        elif isinstance(self.memory, Memory):
            yield from self._make_memories_and_summaries(run_messages, session_id, user_id)  # type: ignore

    async def _aupdate_memory(
        self,
        run_messages: RunMessages,
        session_id: str,
        user_id: Optional[str] = None,
        stream_intermediate_steps: bool = False,
    ) -> AsyncIterator[RunResponseEvent]:
        self.run_response = cast(RunResponse, self.run_response)
        if isinstance(self.memory, AgentMemory):
            self.memory = cast(AgentMemory, self.memory)
        else:
            self.memory = cast(Memory, self.memory)

        if isinstance(self.memory, AgentMemory):
            # Update the memories with the user message if needed
            if (
                self.memory.create_user_memories
                and self.memory.update_user_memories_after_run
                and run_messages.user_message is not None
            ):
                await self.memory.aupdate_memory(input=run_messages.user_message.get_content_string())

                # Yield UpdatingMemory event
                if stream_intermediate_steps:
                    yield self._handle_event(
                        create_memory_update_started_event(from_run_response=self.run_response), self.run_response
                    )

            if run_messages.extra_messages is not None and len(run_messages.extra_messages) > 0:
                for _im in run_messages.extra_messages:
                    # Parse the message and convert to a Message object if possible
                    mp = None
                    if isinstance(_im, Message):
                        mp = _im
                    elif isinstance(_im, dict):
                        try:
                            mp = Message(**_im)
                        except Exception as e:
                            log_warning(f"Failed to validate message during memory update: {e}")
                    else:
                        log_warning(f"Unsupported message type: {type(_im)}")
                        continue

                    # Add the message to the AgentRun
                    if mp:
                        if self.memory.create_user_memories and self.memory.update_user_memories_after_run:
                            await self.memory.aupdate_memory(input=mp.get_content_string())
                    else:
                        log_warning("Unable to add message to memory")
            # Update the session summary if needed
            if self.memory.create_session_summary and self.memory.update_session_summary_after_run:
                await self.memory.aupdate_summary()

        elif isinstance(self.memory, Memory):
            async for event in self._amake_memories_and_summaries(run_messages, session_id, user_id):  # type: ignore
                yield event

    def _handle_model_response_stream(
        self,
        run_response: RunResponse,
        run_messages: RunMessages,
        response_format: Optional[Union[Dict, Type[BaseModel]]] = None,
        stream_intermediate_steps: bool = False,
    ) -> Iterator[RunResponseEvent]:
        self.model = cast(Model, self.model)

        reasoning_state = {
            "reasoning_started": False,
            "reasoning_time_taken": 0.0,
        }
        model_response = ModelResponse(content="")

        stream_model_response = True
        if self.should_parse_structured_output:
            log_debug("Response model set, model response is not streamed.")
            stream_model_response = False

        for model_response_event in self.model.response_stream(
            messages=run_messages.messages,
            response_format=response_format,
            tools=self._tools_for_model,
            functions=self._functions_for_model,
            tool_choice=self.tool_choice,
            tool_call_limit=self.tool_call_limit,
            stream_model_response=stream_model_response,
        ):
            yield from self._handle_model_response_chunk(
                run_response=run_response,
                model_response=model_response,
                model_response_event=model_response_event,
                reasoning_state=reasoning_state,
                parse_structured_output=self.should_parse_structured_output,
                stream_intermediate_steps=stream_intermediate_steps,
            )

        # Determine reasoning completed
        if stream_intermediate_steps and reasoning_state["reasoning_started"]:
            all_reasoning_steps: List[ReasoningStep] = []
            if run_response and run_response.extra_data and hasattr(run_response.extra_data, "reasoning_steps"):
                all_reasoning_steps = cast(List[ReasoningStep], run_response.extra_data.reasoning_steps)

            if all_reasoning_steps:
                self._add_reasoning_metrics_to_extra_data(reasoning_state["reasoning_time_taken"])
                yield self._handle_event(
                    create_reasoning_completed_event(
                        from_run_response=run_response,
                        content=ReasoningSteps(reasoning_steps=all_reasoning_steps),
                        content_type=ReasoningSteps.__name__,
                    ),
                    run_response,
                )

        # Update RunResponse
        # Build a list of messages that should be added to the RunResponse
        messages_for_run_response = [m for m in run_messages.messages if m.add_to_agent_memory]
        # Update the RunResponse messages
        run_response.messages = messages_for_run_response
        # Update the RunResponse metrics
        run_response.metrics = self.aggregate_metrics_from_messages(messages_for_run_response)

        # Update the run_response audio if streaming
        if model_response.audio is not None:
            run_response.response_audio = model_response.audio

    async def _ahandle_model_response_stream(
        self,
        run_response: RunResponse,
        run_messages: RunMessages,
        response_format: Optional[Union[Dict, Type[BaseModel]]] = None,
        stream_intermediate_steps: bool = False,
    ) -> AsyncIterator[RunResponseEvent]:
        self.model = cast(Model, self.model)

        reasoning_state = {
            "reasoning_started": False,
            "reasoning_time_taken": 0.0,
        }
        model_response = ModelResponse(content="")

        stream_model_response = True
        if self.should_parse_structured_output:
            log_debug("Response model set, model response is not streamed.")
            stream_model_response = False

        model_response_stream = self.model.aresponse_stream(
            messages=run_messages.messages,
            response_format=response_format,
            tools=self._tools_for_model,
            functions=self._functions_for_model,
            tool_choice=self.tool_choice,
            tool_call_limit=self.tool_call_limit,
            stream_model_response=stream_model_response,
        )  # type: ignore

        async for model_response_event in model_response_stream:  # type: ignore
            for event in self._handle_model_response_chunk(
                run_response=run_response,
                model_response=model_response,
                model_response_event=model_response_event,
                reasoning_state=reasoning_state,
                parse_structured_output=self.should_parse_structured_output,
                stream_intermediate_steps=stream_intermediate_steps,
            ):
                yield event

        if stream_intermediate_steps and reasoning_state["reasoning_started"]:
            all_reasoning_steps: List[ReasoningStep] = []
            if run_response and run_response.extra_data and hasattr(run_response.extra_data, "reasoning_steps"):
                all_reasoning_steps = cast(List[ReasoningStep], run_response.extra_data.reasoning_steps)

            if all_reasoning_steps:
                self._add_reasoning_metrics_to_extra_data(reasoning_state["reasoning_time_taken"])
                yield self._handle_event(
                    create_reasoning_completed_event(
                        from_run_response=run_response,
                        content=ReasoningSteps(reasoning_steps=all_reasoning_steps),
                        content_type=ReasoningSteps.__name__,
                    ),
                    run_response,
                )

        # Update RunResponse
        # Build a list of messages that should be added to the RunResponse
        messages_for_run_response = [m for m in run_messages.messages if m.add_to_agent_memory]
        # Update the RunResponse messages
        run_response.messages = messages_for_run_response
        # Update the RunResponse metrics
        run_response.metrics = self.aggregate_metrics_from_messages(messages_for_run_response)

        # Update the run_response audio if streaming
        if model_response.audio is not None:
            run_response.response_audio = model_response.audio

    def _handle_model_response_chunk(
        self,
        run_response: RunResponse,
        model_response: ModelResponse,
        model_response_event: Union[ModelResponse, RunResponseEvent, TeamRunResponseEvent],
        reasoning_state: Optional[Dict[str, Any]] = None,
        parse_structured_output: bool = False,
        stream_intermediate_steps: bool = False,
    ) -> Iterator[RunResponseEvent]:
        if isinstance(model_response_event, tuple(get_args(RunResponseEvent))) or isinstance(
            model_response_event, tuple(get_args(TeamRunResponseEvent))
        ):
            # We just bubble the event up
            yield self._handle_event(model_response_event, run_response)  # type: ignore
        else:
            model_response_event = cast(ModelResponse, model_response_event)
            # If the model response is an assistant_response, yield a RunResponse
            if model_response_event.event == ModelResponseEvent.assistant_response.value:
                content_type = "str"

                # Process content and thinking
                if model_response_event.content is not None:
                    if parse_structured_output:
                        model_response.content = model_response_event.content
                        self._convert_response_to_structured_format(model_response)

                        content_type = self.response_model.__name__  # type: ignore
                        run_response.content = model_response.content
                        run_response.content_type = content_type
                    else:
                        model_response.content = (model_response.content or "") + model_response_event.content
                        run_response.content = model_response.content
                        run_response.content_type = "str"

                if model_response_event.thinking is not None:
                    model_response.thinking = (model_response.thinking or "") + model_response_event.thinking
                    run_response.thinking = model_response.thinking

                if model_response_event.reasoning_content is not None:
                    model_response.reasoning_content = (
                        model_response.reasoning_content or ""
                    ) + model_response_event.reasoning_content
                    run_response.reasoning_content = model_response.reasoning_content

                if model_response_event.redacted_thinking is not None:
                    model_response.redacted_thinking = (
                        model_response.redacted_thinking or ""
                    ) + model_response_event.redacted_thinking

                    # We only have thinking on response
                    run_response.thinking = model_response.redacted_thinking

                if model_response_event.citations is not None:
                    # We get citations in one chunk
                    run_response.citations = model_response_event.citations

                # Only yield if we have content to show
                if content_type != "str":
                    yield self._handle_event(
                        create_run_response_content_event(
                            from_run_response=run_response,
                            content=model_response.content,
                            content_type=content_type,
                        ),
                        run_response,
                    )
                elif (
                    model_response_event.content is not None
                    or model_response_event.thinking is not None
                    or model_response_event.reasoning_content is not None
                    or model_response_event.redacted_thinking is not None
                    or model_response_event.citations is not None
                ):
                    yield self._handle_event(
                        create_run_response_content_event(
                            from_run_response=run_response,
                            content=model_response_event.content,
                            thinking=model_response_event.thinking,
                            reasoning_content=model_response_event.reasoning_content,
                            redacted_thinking=model_response_event.redacted_thinking,
                            citations=model_response_event.citations,
                        ),
                        run_response,
                    )

                # Process audio
                if model_response_event.audio is not None:
                    if model_response.audio is None:
                        model_response.audio = AudioResponse(id=str(uuid4()), content="", transcript="")

                    if model_response_event.audio.id is not None:
                        model_response.audio.id = model_response_event.audio.id  # type: ignore
                    if model_response_event.audio.content is not None:
                        model_response.audio.content += model_response_event.audio.content  # type: ignore
                    if model_response_event.audio.transcript is not None:
                        model_response.audio.transcript += model_response_event.audio.transcript  # type: ignore
                    if model_response_event.audio.expires_at is not None:
                        model_response.audio.expires_at = model_response_event.audio.expires_at  # type: ignore
                    if model_response_event.audio.mime_type is not None:
                        model_response.audio.mime_type = model_response_event.audio.mime_type  # type: ignore
                    model_response.audio.sample_rate = model_response_event.audio.sample_rate
                    model_response.audio.channels = model_response_event.audio.channels

                    # Yield the audio and transcript bit by bit
                    run_response.response_audio = AudioResponse(
                        id=model_response_event.audio.id,
                        content=model_response_event.audio.content,
                        transcript=model_response_event.audio.transcript,
                        sample_rate=model_response_event.audio.sample_rate,
                        channels=model_response_event.audio.channels,
                    )
                    run_response.created_at = model_response_event.created_at

                    yield self._handle_event(
                        create_run_response_content_event(
                            from_run_response=run_response,
                            response_audio=run_response.response_audio,
                        ),
                        run_response,
                    )

                if model_response_event.image is not None:
                    self.add_image(model_response_event.image)

                    yield self._handle_event(
                        create_run_response_content_event(
                            from_run_response=run_response,
                            image=model_response_event.image,
                        ),
                        run_response,
                    )

            # Handle tool interruption events
            elif model_response_event.event == ModelResponseEvent.tool_call_paused.value:
                # Add tool calls to the run_response
                tool_executions_list = model_response_event.tool_executions
                if tool_executions_list is not None:
                    # Add tool calls to the agent.run_response
                    if run_response.tools is None:
                        run_response.tools = tool_executions_list
                    else:
                        run_response.tools.extend(tool_executions_list)

                    # Format tool calls whenever new ones are added during streaming
                    run_response.formatted_tool_calls = format_tool_calls(run_response.tools)
            # If the model response is a tool_call_started, add the tool call to the run_response
            elif (
                model_response_event.event == ModelResponseEvent.tool_call_started.value
            ):  # Add tool calls to the run_response
                tool_executions_list = model_response_event.tool_executions
                if tool_executions_list is not None:
                    # Add tool calls to the agent.run_response
                    if run_response.tools is None:
                        run_response.tools = tool_executions_list
                    else:
                        run_response.tools.extend(tool_executions_list)

                    # Format tool calls whenever new ones are added during streaming
                    run_response.formatted_tool_calls = format_tool_calls(run_response.tools)

                    # Yield each tool call started event
                    for tool in tool_executions_list:
                        yield self._handle_event(
                            create_tool_call_started_event(from_run_response=run_response, tool=tool), run_response
                        )

            # If the model response is a tool_call_completed, update the existing tool call in the run_response
            elif model_response_event.event == ModelResponseEvent.tool_call_completed.value:
                reasoning_step: Optional[ReasoningStep] = None

                tool_executions_list = model_response_event.tool_executions
                if tool_executions_list is not None:
                    # Update the existing tool call in the run_response
                    if run_response.tools:
                        # Create a mapping of tool_call_id to index
                        tool_call_index_map = {
                            tc.tool_call_id: i for i, tc in enumerate(run_response.tools) if tc.tool_call_id is not None
                        }
                        # Process tool calls
                        for tool_call_dict in tool_executions_list:
                            tool_call_id = tool_call_dict.tool_call_id or ""
                            index = tool_call_index_map.get(tool_call_id)
                            if index is not None:
                                run_response.tools[index] = tool_call_dict
                    else:
                        run_response.tools = tool_executions_list

                    # Only iterate through new tool calls
                    for tool_call in tool_executions_list:
                        tool_name = tool_call.tool_name or ""
                        if tool_name.lower() in ["think", "analyze"]:
                            tool_args = tool_call.tool_args or {}

                            reasoning_step = self.update_reasoning_content_from_tool_call(tool_name, tool_args)

                            metrics = tool_call.metrics
                            if metrics is not None and metrics.time is not None and reasoning_state is not None:
                                reasoning_state["reasoning_time_taken"] = reasoning_state[
                                    "reasoning_time_taken"
                                ] + float(metrics.time)
                        yield self._handle_event(
                            create_tool_call_completed_event(
                                from_run_response=run_response, tool=tool_call, content=model_response_event.content
                            ),
                            run_response,
                        )

                if stream_intermediate_steps:
                    if reasoning_step is not None:
                        if reasoning_state and not reasoning_state["reasoning_started"]:
                            yield self._handle_event(
                                create_reasoning_started_event(from_run_response=run_response), run_response
                            )
                            reasoning_state["reasoning_started"] = True

                        yield self._handle_event(
                            create_reasoning_step_event(
                                from_run_response=run_response,
                                reasoning_step=reasoning_step,
                                reasoning_content=run_response.reasoning_content or "",
                            ),
                            run_response,
                        )

    def create_run_response(
        self,
        content: Optional[Any] = None,
        *,
        session_id: Optional[str] = None,
        thinking: Optional[str] = None,
        redacted_thinking: Optional[str] = None,
        reasoning_content: Optional[str] = None,
        run_state: RunStatus = RunStatus.running,
        content_type: Optional[str] = None,
        created_at: Optional[int] = None,
        citations: Optional[Citations] = None,
        run_response: Optional[RunResponse] = None,
    ) -> RunResponse:
        thinking_combined = (thinking or "") + (redacted_thinking or "")

        tools = None
        response_audio = None
        audio = None
        images = None
        videos = None
        model = None
        messages = None
        extra_data = None

        if run_response:
            model = run_response.model
            messages = run_response.messages
            extra_data = run_response.extra_data
            if not content:
                content = run_response.content
                content_type = run_response.content_type
            audio = run_response.audio
            images = run_response.images
            videos = run_response.videos
            response_audio = run_response.response_audio
            citations = run_response.citations
            tools = run_response.tools
            reasoning_content = run_response.reasoning_content

        rr = RunResponse(
            run_id=self.run_id,
            status=run_state,
            session_id=session_id,
            team_session_id=self.team_session_id,
            agent_id=self.agent_id,
            agent_name=self.name,
            content=content,
            thinking=thinking_combined if thinking_combined else None,
            reasoning_content=reasoning_content,
            tools=tools,
            audio=audio,
            images=images,
            videos=videos,
            citations=citations,
            response_audio=response_audio,
        )
        if content_type is not None:
            rr.content_type = content_type
        if created_at is not None:
            rr.created_at = created_at
        if messages is not None:
            rr.messages = messages
        if extra_data is not None:
            rr.extra_data = extra_data
        if model is not None:
            rr.model = model
        return rr

    def _make_memories_and_summaries(
        self,
        run_messages: RunMessages,
        session_id: str,
        user_id: Optional[str] = None,
    ) -> Iterator[RunResponseEvent]:
        from concurrent.futures import ThreadPoolExecutor, as_completed

        self.run_response = cast(RunResponse, self.run_response)
        self.memory = cast(Memory, self.memory)

        with ThreadPoolExecutor(max_workers=3) as executor:
            futures = []

            # Create user memories from single message
            if self.enable_user_memories and run_messages.user_message is not None:
                log_debug("Creating user memories.")
                futures.append(
                    executor.submit(
                        self.memory.create_user_memories,
                        message=run_messages.user_message.get_content_string(),
                        user_id=user_id,
                    )
                )

            # Parse messages if provided
            if (
                self.enable_user_memories
                and run_messages.extra_messages is not None
                and len(run_messages.extra_messages) > 0
            ):
                parsed_messages = []
                for _im in run_messages.extra_messages:
                    if isinstance(_im, Message):
                        parsed_messages.append(_im)
                    elif isinstance(_im, dict):
                        try:
                            parsed_messages.append(Message(**_im))
                        except Exception as e:
                            log_warning(f"Failed to validate message during memory update: {e}")
                    else:
                        log_warning(f"Unsupported message type: {type(_im)}")
                        continue

                if len(parsed_messages) > 0:
                    futures.append(
                        executor.submit(self.memory.create_user_memories, messages=parsed_messages, user_id=user_id)
                    )
                else:
                    log_warning("Unable to add messages to memory")

            # Create session summary
            if self.enable_session_summaries:
                log_debug("Creating session summary.")
                futures.append(
                    executor.submit(self.memory.create_session_summary, session_id=session_id, user_id=user_id)  # type: ignore
                )

            if futures:
                if self.stream_intermediate_steps:
                    yield self._handle_event(
                        create_memory_update_started_event(from_run_response=self.run_response), self.run_response
                    )

                # Wait for all operations to complete and handle any errors
                for future in as_completed(futures):
                    try:
                        future.result()
                    except Exception as e:
                        log_warning(f"Error in memory/summary operation: {str(e)}")

                if self.stream_intermediate_steps:
                    yield self._handle_event(
                        create_memory_update_completed_event(from_run_response=self.run_response), self.run_response
                    )

    async def _amake_memories_and_summaries(
        self,
        run_messages: RunMessages,
        session_id: str,
        user_id: Optional[str] = None,
    ) -> AsyncIterator[RunResponseEvent]:
        self.run_response = cast(RunResponse, self.run_response)
        self.memory = cast(Memory, self.memory)
        tasks = []

        # Create user memories from single message
        if self.enable_user_memories and run_messages.user_message is not None:
            log_debug("Creating user memories.")

            tasks.append(
                self.memory.acreate_user_memories(
                    message=run_messages.user_message.get_content_string(), user_id=user_id
                )
            )

        # Parse messages if provided
        if (
            self.enable_user_memories
            and run_messages.extra_messages is not None
            and len(run_messages.extra_messages) > 0
        ):
            parsed_messages = []
            for _im in run_messages.extra_messages:
                if isinstance(_im, Message):
                    parsed_messages.append(_im)
                elif isinstance(_im, dict):
                    try:
                        parsed_messages.append(Message(**_im))
                    except Exception as e:
                        log_warning(f"Failed to validate message during memory update: {e}")
                else:
                    log_warning(f"Unsupported message type: {type(_im)}")
                    continue

            if len(parsed_messages) > 0:
                tasks.append(self.memory.acreate_user_memories(messages=parsed_messages, user_id=user_id))
            else:
                log_warning("Unable to add messages to memory")

        # Create session summary
        if self.enable_session_summaries:
            log_debug("Creating session summary.")
            tasks.append(self.memory.acreate_session_summary(session_id=session_id, user_id=user_id))  # type: ignore

        if tasks:
            if self.stream_intermediate_steps:
                yield self._handle_event(
                    create_memory_update_started_event(from_run_response=self.run_response), self.run_response
                )

            # Execute all tasks concurrently and handle any errors
            try:
                await asyncio.gather(*tasks)
            except Exception as e:
                log_warning(f"Error in memory/summary operation: {str(e)}")

            if self.stream_intermediate_steps:
                yield self._handle_event(
                    create_memory_update_completed_event(from_run_response=self.run_response), self.run_response
                )

    def _raise_if_async_tools(self) -> None:
        """Raise an exception if any tools contain async functions"""
        if self.tools is None:
            return

        from inspect import iscoroutinefunction

        for tool in self.tools:
            if isinstance(tool, Toolkit):
                for func in tool.functions:
                    if iscoroutinefunction(tool.functions[func].entrypoint):
                        raise Exception(
                            f"Async tool {tool.name} can't be used with synchronous agent.run() or agent.print_response(). "
                            "Use agent.arun() or agent.aprint_response() instead to use this tool."
                        )
            elif isinstance(tool, Function):
                if iscoroutinefunction(tool.entrypoint):
                    raise Exception(
                        f"Async function {tool.name} can't be used with synchronous agent.run() or agent.print_response(). "
                        "Use agent.arun() or agent.aprint_response() instead to use this tool."
                    )
            elif callable(tool):
                if iscoroutinefunction(tool):
                    raise Exception(
                        f"Async function {tool.__name__} can't be used with synchronous agent.run() or agent.print_response(). "
                        "Use agent.arun() or agent.aprint_response() instead to use this tool."
                    )

    def get_tools(
        self,
        session_id: str,
        async_mode: bool = False,
        user_id: Optional[str] = None,
        knowledge_filters: Optional[Dict[str, Any]] = None,
    ) -> Optional[List[Union[Toolkit, Callable, Function, Dict]]]:
        agent_tools: List[Union[Toolkit, Callable, Function, Dict]] = []

        # Add provided tools
        if self.tools is not None:
            # If not running in async mode, raise if any tool is async
            if not async_mode:
                self._raise_if_async_tools()
            agent_tools.extend(self.tools)

            # If any of the tools has "agent" as parameter, set _rebuild_tools to True
            for tool in agent_tools:
                if isinstance(tool, Function):
                    if "agent" in tool.parameters:
                        self._rebuild_tools = True
                        break
                    if "team" in tool.parameters:
                        self._rebuild_tools = True
                        break
                if isinstance(tool, Toolkit):
                    for func in tool.functions.values():
                        if "agent" in func.parameters:
                            self._rebuild_tools = True
                            break
                        if "team" in func.parameters:
                            self._rebuild_tools = True
                            break
                if callable(tool):
                    from inspect import signature

                    sig = signature(tool)
                    if "agent" in sig.parameters:
                        self._rebuild_tools = True
                        break
                    if "team" in sig.parameters:
                        self._rebuild_tools = True
                        break

        # Add tools for accessing memory
        if self.read_chat_history:
            agent_tools.append(self.get_chat_history_function(session_id=session_id))
            self._rebuild_tools = True
        if self.read_tool_call_history:
            agent_tools.append(self.get_tool_call_history_function(session_id=session_id))
            self._rebuild_tools = True
        if self.search_previous_sessions_history:
            agent_tools.append(
                self.get_previous_sessions_messages_function(
                    num_history_sessions=self.num_history_sessions, user_id=user_id
                )
            )
            self._rebuild_tools = True

        if isinstance(self.memory, AgentMemory) and self.memory.create_user_memories:
            agent_tools.append(self.update_memory)
        elif isinstance(self.memory, Memory) and self.enable_agentic_memory:
            agent_tools.append(self.get_update_user_memory_function(user_id=user_id, async_mode=async_mode))
            self._rebuild_tools = True

        # Add tools for accessing knowledge
        if self.knowledge is not None or self.retriever is not None:
            # Check if retriever is an async function but used in sync mode
            from inspect import iscoroutinefunction

            if not async_mode and self.retriever and iscoroutinefunction(self.retriever):
                log_warning(
                    "Async retriever function is being used with synchronous agent.run() or agent.print_response(). "
                    "It is recommended to use agent.arun() or agent.aprint_response() instead."
                )

            if self.search_knowledge:
                # Use async or sync search based on async_mode
                if self.enable_agentic_knowledge_filters:
                    agent_tools.append(
                        self.search_knowledge_base_with_agentic_filters_function(
                            async_mode=async_mode, knowledge_filters=knowledge_filters
                        )
                    )
                else:
                    agent_tools.append(
                        self.search_knowledge_base_function(async_mode=async_mode, knowledge_filters=knowledge_filters)
                    )
                self._rebuild_tools = True

            if self.update_knowledge:
                agent_tools.append(self.add_to_knowledge)

        # Add transfer tools
        if self.has_team and self.team is not None:
            for agent_index, agent in enumerate(self.team):
                agent_tools.append(self.get_transfer_function(agent, agent_index, session_id))
            self._rebuild_tools = True

        return agent_tools

    def determine_tools_for_model(
        self,
        model: Model,
        session_id: str,
        user_id: Optional[str] = None,
        async_mode: bool = False,
        knowledge_filters: Optional[Dict[str, Any]] = None,
    ) -> None:
        if self._rebuild_tools:
            self._rebuild_tools = False

            agent_tools = self.get_tools(
                session_id=session_id, async_mode=async_mode, user_id=user_id, knowledge_filters=knowledge_filters
            )

            self._tools_for_model = []
            self._functions_for_model = {}
            self._tool_instructions = []

            # Get Agent tools
            if agent_tools is not None and len(agent_tools) > 0:
                log_debug("Processing tools for model")

                # Check if we need strict mode for the functions for the model
                strict = False
                if (
                    self.response_model is not None
                    and (self.structured_outputs or (not self.use_json_mode))
                    and model.supports_native_structured_outputs
                ):
                    strict = True

                for tool in agent_tools:
                    if isinstance(tool, Dict):
                        # If a dict is passed, it is a builtin tool
                        # that is run by the model provider and not the Agent
                        self._tools_for_model.append(tool)
                        log_debug(f"Included builtin tool {tool}")

                    elif isinstance(tool, Toolkit):
                        # For each function in the toolkit and process entrypoint
                        for name, func in tool.functions.items():
                            # If the function does not exist in self.functions
                            if name not in self._functions_for_model:
                                func._agent = self
                                func.process_entrypoint(strict=strict)
                                if strict and func.strict is None:
                                    func.strict = True
                                if self.tool_hooks is not None:
                                    func.tool_hooks = self.tool_hooks
                                self._functions_for_model[name] = func
                                self._tools_for_model.append({"type": "function", "function": func.to_dict()})
                                log_debug(f"Added tool {name} from {tool.name}")

                        # Add instructions from the toolkit
                        if tool.add_instructions and tool.instructions is not None:
                            self._tool_instructions.append(tool.instructions)

                    elif isinstance(tool, Function):
                        if tool.name not in self._functions_for_model:
                            tool._agent = self
                            tool.process_entrypoint(strict=strict)
                            if strict and tool.strict is None:
                                tool.strict = True
                            if self.tool_hooks is not None:
                                tool.tool_hooks = self.tool_hooks
                            self._functions_for_model[tool.name] = tool
                            self._tools_for_model.append({"type": "function", "function": tool.to_dict()})
                            log_debug(f"Added tool {tool.name}")

                        # Add instructions from the Function
                        if tool.add_instructions and tool.instructions is not None:
                            self._tool_instructions.append(tool.instructions)

                    elif callable(tool):
                        try:
                            function_name = tool.__name__
                            if function_name not in self._functions_for_model:
                                func = Function.from_callable(tool, strict=strict)
                                func._agent = self
                                if strict:
                                    func.strict = True
                                if self.tool_hooks is not None:
                                    func.tool_hooks = self.tool_hooks
                                self._functions_for_model[func.name] = func
                                self._tools_for_model.append({"type": "function", "function": func.to_dict()})
                                log_debug(f"Added tool {func.name}")
                        except Exception as e:
                            log_warning(f"Could not add tool {tool}: {e}")

    def _model_should_return_structured_output(self):
        self.model = cast(Model, self.model)
        return bool(
            self.model.supports_native_structured_outputs
            and self.response_model is not None
            and (not self.use_json_mode or self.structured_outputs)
        )

    def _get_response_format(self, model: Optional[Model] = None) -> Optional[Union[Dict, Type[BaseModel]]]:
        model = cast(Model, model or self.model)
        if self.response_model is None:
            return None
        else:
            json_response_format = {"type": "json_object"}

            if model.supports_native_structured_outputs:
                if not self.use_json_mode or self.structured_outputs:
                    log_debug("Setting Model.response_format to Agent.response_model")
                    return self.response_model
                else:
                    log_debug(
                        "Model supports native structured outputs but it is not enabled. Using JSON mode instead."
                    )
                    return json_response_format

            elif model.supports_json_schema_outputs:
                if self.use_json_mode or (not self.structured_outputs):
                    log_debug("Setting Model.response_format to JSON response mode")
                    return {
                        "type": "json_schema",
                        "json_schema": {
                            "name": self.response_model.__name__,
                            "schema": self.response_model.model_json_schema(),
                        },
                    }
                else:
                    return None

            else:
                log_debug("Model does not support structured or JSON schema outputs.")
                return json_response_format

    def resolve_run_context(self) -> None:
        from inspect import signature

        log_debug("Resolving context")
        if not isinstance(self.context, dict):
            log_warning("Context is not a dict")
            return

        for key, value in self.context.items():
            if callable(value):
                try:
                    sig = signature(value)
                    result = value(agent=self) if "agent" in sig.parameters else value()
                    if result is not None:
                        self.context[key] = result
                except Exception as e:
                    log_warning(f"Failed to resolve context for '{key}': {e}")
            else:
                self.context[key] = value

    async def aresolve_run_context(self) -> None:
        from inspect import iscoroutine, signature

        log_debug("Resolving context (async)")
        if not isinstance(self.context, dict):
            log_warning("Context is not a dict")
            return

        for key, value in self.context.items():
            if not callable(value):
                self.context[key] = value
                continue

            try:
                sig = signature(value)
                result = value(agent=self) if "agent" in sig.parameters else value()

                if iscoroutine(result):
                    result = await result

                self.context[key] = result
            except Exception as e:
                log_warning(f"Failed to resolve context for '{key}': {e}")

    def get_agent_data(self) -> Dict[str, Any]:
        agent_data: Dict[str, Any] = {}
        if self.name is not None:
            agent_data["name"] = self.name
        if self.agent_id is not None:
            agent_data["agent_id"] = self.agent_id
        if self.model is not None:
            agent_data["model"] = self.model.to_dict()
        return agent_data

    def get_session_data(self) -> Dict[str, Any]:
        session_data: Dict[str, Any] = {}
        if self.session_name is not None:
            session_data["session_name"] = self.session_name
        if self.session_state is not None and len(self.session_state) > 0:
            session_data["session_state"] = self.session_state
        if self.team_session_state is not None and len(self.team_session_state) > 0:
            session_data["team_session_state"] = self.team_session_state
        if self.workflow_session_state is not None and len(self.workflow_session_state) > 0:
            session_data["workflow_session_state"] = self.workflow_session_state
        if self.session_metrics is not None:
            session_data["session_metrics"] = asdict(self.session_metrics) if self.session_metrics is not None else None
        if self.team_data is not None:
            session_data["team_data"] = self.team_data
        if self.images is not None:
            session_data["images"] = [img.to_dict() for img in self.images]  # type: ignore
        if self.videos is not None:
            session_data["videos"] = [vid.to_dict() for vid in self.videos]  # type: ignore
        if self.audio is not None:
            session_data["audio"] = [aud.to_dict() for aud in self.audio]  # type: ignore
        return session_data

    def get_agent_session(self, session_id: str, user_id: Optional[str] = None) -> AgentSession:
        from time import time

        """Get an AgentSession object, which can be saved to the database"""
        if self.memory is not None:
            if isinstance(self.memory, AgentMemory):
                self.memory = cast(AgentMemory, self.memory)
                memory_dict = self.memory.to_dict()
                # We only persist the runs for the current session ID (not all runs in memory)
                memory_dict["runs"] = [
                    agent_run.to_dict()
                    for agent_run in self.memory.runs
                    if agent_run.response is not None and agent_run.response.session_id == session_id
                ]
            else:
                self.memory = cast(Memory, self.memory)
                # We fake the structure on storage, to maintain the interface with the legacy implementation
                run_responses = self.memory.runs.get(session_id, [])  # type: ignore
                memory_dict = self.memory.to_dict()
                memory_dict["runs"] = [rr.to_dict() for rr in run_responses]
        else:
            memory_dict = None

        self.team_session_id = cast(str, self.team_session_id)
        self.workflow_session_id = cast(str, self.workflow_session_id)

        self.agent_id = cast(str, self.agent_id)
        return AgentSession(
            session_id=session_id,
            agent_id=self.agent_id,
            user_id=user_id,
            team_session_id=self.team_session_id,
            workflow_session_id=self.workflow_session_id,
            memory=memory_dict,
            agent_data=self.get_agent_data(),
            session_data=self.get_session_data(),
            extra_data=self.extra_data,
            created_at=int(time()),
        )

    def load_agent_session(self, session: AgentSession):
        """Load the existing Agent from an AgentSession (from the database)"""

        if not hasattr(session, "memory"):
            return

        from agno.utils.merge_dict import merge_dictionaries

        # Get the agent_id, user_id and session_id from the database
        if self.agent_id is None and session.agent_id is not None:
            self.agent_id = session.agent_id
        if self.user_id is None and session.user_id is not None:
            self.user_id = session.user_id
        if self.session_id is None and session.session_id is not None:
            self.session_id = session.session_id

        # Read agent_data from the database
        if session.agent_data is not None:
            # Get name from database and update the agent name if not set
            if self.name is None and "name" in session.agent_data:
                self.name = session.agent_data.get("name")

        # Read session_data from the database
        if session.session_data is not None:
            # Get the session_name from database and update the current session_name if not set
            if self.session_name is None and "session_name" in session.session_data:
                self.session_name = session.session_data.get("session_name")

            # Get the session_state from the database and update the current session_state
            if "session_state" in session.session_data:
                session_state_from_db = session.session_data.get("session_state")
                if (
                    session_state_from_db is not None
                    and isinstance(session_state_from_db, dict)
                    and len(session_state_from_db) > 0
                ):
                    # If the session_state is already set, merge the session_state from the database with the current session_state
                    if self.session_state is not None and len(self.session_state) > 0:
                        # This updates session_state_from_db
                        # If there are conflicting keys, values from session_state_from_db will take precedence
                        merge_dictionaries(self.session_state, session_state_from_db)
                    else:
                        # Update the current session_state
                        self.session_state = session_state_from_db

            # Get the team_session_state from the database and update the current team_session_state
            if "team_session_state" in session.session_data:
                team_session_state_from_db = session.session_data.get("team_session_state")
                if (
                    team_session_state_from_db is not None
                    and isinstance(team_session_state_from_db, dict)
                    and len(team_session_state_from_db) > 0
                ):
                    # If the team_session_state is already set, merge the team_session_state from the database with the current team_session_state
                    if self.team_session_state is not None and len(self.team_session_state) > 0:
                        # This updates team_session_state_from_db
                        # If there are conflicting keys, values from team_session_state_from_db will take precedence
                        merge_dictionaries(self.team_session_state, team_session_state_from_db)
                    else:
                        # Update the current team_session_state
                        self.team_session_state = team_session_state_from_db

            if "workflow_session_state" in session.session_data:
                workflow_session_state_from_db = session.session_data.get("workflow_session_state")
                if (
                    workflow_session_state_from_db is not None
                    and isinstance(workflow_session_state_from_db, dict)
                    and len(workflow_session_state_from_db) > 0
                ):
                    # If the workflow_session_state is already set, merge the workflow_session_state from the database with the current workflow_session_state
                    if self.workflow_session_state is not None and len(self.workflow_session_state) > 0:
                        # This updates workflow_session_state_from_db
                        # If there are conflicting keys, values from workflow_session_state_from_db will take precedence
                        merge_dictionaries(self.workflow_session_state, workflow_session_state_from_db)
                    else:
                        # Update the current workflow_session_state
                        self.workflow_session_state = workflow_session_state_from_db

            # Get the session_metrics from the database
            if "session_metrics" in session.session_data:
                session_metrics_from_db = session.session_data.get("session_metrics")
                if session_metrics_from_db is not None and isinstance(session_metrics_from_db, dict):
                    self.session_metrics = SessionMetrics(**session_metrics_from_db)

            # Get images, videos, and audios from the database
            if "images" in session.session_data:
                images_from_db = session.session_data.get("images")
                if images_from_db is not None and isinstance(images_from_db, list):
                    if self.images is None:
                        self.images = []
                    self.images.extend([ImageArtifact.model_validate(img) for img in images_from_db])
            if "videos" in session.session_data:
                videos_from_db = session.session_data.get("videos")
                if videos_from_db is not None and isinstance(videos_from_db, list):
                    if self.videos is None:
                        self.videos = []
                    self.videos.extend([VideoArtifact.model_validate(vid) for vid in videos_from_db])
            if "audio" in session.session_data:
                audio_from_db = session.session_data.get("audio")
                if audio_from_db is not None and isinstance(audio_from_db, list):
                    if self.audio is None:
                        self.audio = []
                    self.audio.extend([AudioArtifact.model_validate(aud) for aud in audio_from_db])

        # Read extra_data from the database
        if session.extra_data is not None:
            # If extra_data is set in the agent, update the database extra_data with the agent's extra_data
            if self.extra_data is not None:
                # Updates agent_session.extra_data in place
                merge_dictionaries(session.extra_data, self.extra_data)
            # Update the current extra_data with the extra_data from the database which is updated in place
            self.extra_data = session.extra_data

        # If we haven't instantiated the memory yet, set it to the memory from the database
        if self.memory is None:
            self.memory = session.memory  # type: ignore

        if not (isinstance(self.memory, AgentMemory) or isinstance(self.memory, Memory)):
            # Is it a dict of `AgentMemory`?
            if isinstance(self.memory, dict) and "create_user_memories" in self.memory:
                # Convert dict to AgentMemory
                self.memory = AgentMemory(**self.memory)
                # Convert dict to Memory
            elif isinstance(self.memory, dict):
                memory_dict = self.memory

                memory_dict.pop("runs")
                self.memory = Memory(**memory_dict)
            else:
                raise TypeError(f"Expected memory to be a dict or AgentMemory, but got {type(self.memory)}")

        if session.memory is not None:
            if isinstance(self.memory, AgentMemory):
                try:
                    if "runs" in session.memory:
                        try:
                            self.memory.runs = []
                            for run in session.memory["runs"]:
                                self.memory.runs.append(AgentRun.model_validate(run))
                        except Exception as e:
                            log_warning(f"Failed to load runs from memory: {e}")
                    if "messages" in session.memory:
                        try:
                            self.memory.messages = [Message.model_validate(m) for m in session.memory["messages"]]
                        except Exception as e:
                            log_warning(f"Failed to load messages from memory: {e}")
                    if "summary" in session.memory:
                        from agno.memory.summary import SessionSummary

                        try:
                            self.memory.summary = SessionSummary.model_validate(session.memory["summary"])
                        except Exception as e:
                            log_warning(f"Failed to load session summary from memory: {e}")
                    if "memories" in session.memory:
                        from agno.memory.memory import Memory as AgentUserMemory

                        try:
                            self.memory.memories = [
                                AgentUserMemory.model_validate(m) for m in session.memory["memories"]
                            ]
                        except Exception as e:
                            log_warning(f"Failed to load user memories: {e}")
                    if self.memory.create_user_memories:
                        if self.user_id is not None and self.memory.user_id is None:
                            self.memory.user_id = self.user_id

                        self.memory.load_user_memories()
                        if self.user_id is not None:
                            log_debug(f"Memories loaded for user: {self.user_id}")
                        else:
                            log_debug("Memories loaded")
                except Exception as e:
                    log_warning(f"Failed to load AgentMemory: {e}")
            elif isinstance(self.memory, Memory):
                if "runs" in session.memory:
                    try:
                        if self.memory.runs is None:
                            self.memory.runs = {}
                        self.memory.runs[session.session_id] = []
                        for run in session.memory["runs"]:
                            run_session_id = run["session_id"]

                            if "team_id" in run:
                                self.memory.runs[run_session_id].append(TeamRunResponse.from_dict(run))
                            else:
                                self.memory.runs[run_session_id].append(RunResponse.from_dict(run))
                    except Exception as e:
                        log_warning(f"Failed to load runs from memory: {e}")
                if "memories" in session.memory:
                    from agno.memory.v2.memory import UserMemory as UserMemoryV2

                    try:
                        # If memories are already loaded, use them as is for the current session
                        if self.memory.memories is not None:
                            pass
                        # If memories do not exist, we load them from the session memory for the current user
                        else:
                            self.memory.memories = {
                                user_id: {
                                    memory_id: UserMemoryV2.from_dict(memory)
                                    for memory_id, memory in user_memories.items()
                                }
                                for user_id, user_memories in session.memory["memories"].items()
                            }
                    except Exception as e:
                        log_warning(f"Failed to load user memories: {e}")
                if "summaries" in session.memory:
                    from agno.memory.v2.memory import SessionSummary as SessionSummaryV2

                    try:
                        self.memory.summaries = {
                            user_id: {
                                session_id: SessionSummaryV2.from_dict(summary)
                                for session_id, summary in user_session_summaries.items()
                            }
                            for user_id, user_session_summaries in session.memory["summaries"].items()
                        }
                    except Exception as e:
                        log_warning(f"Failed to load session summaries: {e}")
        log_debug(f"-*- AgentSession loaded: {session.session_id}")

    def read_from_storage(
        self,
        session_id: str,
    ) -> Optional[AgentSession]:
        """Load the AgentSession from storage

        Args:
            session_id: The session_id to load from storage.

        Returns:
            Optional[AgentSession]: The loaded AgentSession or None if not found.
        """
        if self.storage is not None:
            # Get a single session from storage
            self.agent_session = cast(AgentSession, self.storage.read(session_id=session_id))
            if self.agent_session is not None:
                # Load the agent session
                self.load_agent_session(session=self.agent_session)
        return self.agent_session

    def refresh_from_storage(self, session_id: str) -> None:
        """Refresh the AgentSession from storage

        Args:
            session_id: The session_id to refresh from storage.
        """
        if not self.storage:
            return

        agent_session_from_db = self.storage.read(session_id=session_id)  # type: ignore
        if (
            agent_session_from_db is not None
            and agent_session_from_db.memory is not None  # type: ignore
            and "runs" in agent_session_from_db.memory  # type: ignore
        ):
            if isinstance(self.memory, AgentMemory):
                return
            try:
                if self.memory.runs is None:  # type: ignore
                    self.memory.runs = {}  # type: ignore
                if session_id not in self.memory.runs:  # type: ignore
                    self.memory.runs[session_id] = []  # type: ignore
                for run in agent_session_from_db.memory["runs"]:  # type: ignore
                    run_session_id = run["session_id"]
                    skip = False
                    for existing_run in self.memory.runs[run_session_id]:  # type: ignore
                        if existing_run.run_id == run["run_id"]:
                            skip = True
                            break
                    if skip:
                        continue
                    if "team_id" in run:
                        self.memory.runs[run_session_id].append(TeamRunResponse.from_dict(run))  # type: ignore
                    else:
                        self.memory.runs[run_session_id].append(RunResponse.from_dict(run))  # type: ignore
            except Exception as e:
                log_warning(f"Failed to load runs from memory: {e}")

    def write_to_storage(
        self, session_id: str, user_id: Optional[str] = None, refresh_session: Optional[bool] = False
    ) -> Optional[AgentSession]:
        """Save the AgentSession to storage

        Returns:
            Optional[AgentSession]: The saved AgentSession or None if not saved.
        """
        if self.storage is not None:
            if refresh_session:
                self.refresh_from_storage(session_id=session_id)

            self.agent_session = cast(
                AgentSession,
                self.storage.upsert(session=self.get_agent_session(session_id=session_id, user_id=user_id)),
            )

        if not self.cache_session:
            if self.memory is not None and self.memory.runs is not None and session_id in self.memory.runs:
                self.memory.runs.pop(session_id)  # type: ignore

        return self.agent_session

    def add_introduction(self, introduction: str) -> None:
        """Add an introduction to the chat history"""

        if isinstance(self.memory, AgentMemory):
            if introduction is not None:
                # Add an introduction as the first response from the Agent
                if len(self.memory.runs) == 0:
                    self.memory.add_run(
                        AgentRun(
                            response=RunResponse(
                                content=introduction,
                                messages=[
                                    Message(role=self.model.assistant_message_role, content=introduction)  # type: ignore
                                ],
                            )
                        )
                    )

    def load_session(self, force: bool = False) -> Optional[str]:
        """Load an existing session from the database and return the session_id.
        If a session does not exist, create a new session.

        - If a session exists in the database, load the session.
        - If a session does not exist in the database, create a new session.
        """
        # If an agent_session is already loaded, return the session_id from the agent_session
        #   if the session_id matches the session_id from the agent_session
        if self.agent_session is not None and not force:
            if self.session_id is not None and self.agent_session.session_id == self.session_id:
                return self.agent_session.session_id

        # Load an existing session or create a new session
        if self.storage is not None:
            # Load existing session if session_id is provided
            log_debug(f"Reading AgentSession: {self.session_id}")
            self.read_from_storage(session_id=self.session_id)  # type: ignore

            # Create a new session if it does not exist
            if self.agent_session is None:
                log_debug("-*- Creating new AgentSession")
                # Initialize the agent_id and session_id if they are not set
                if self.agent_id is None:
                    self.initialize_agent()
                if self.session_id is None or self.session_id == "":
                    self.session_id = str(uuid4())
                if self.introduction is not None:
                    self.add_introduction(self.introduction)
                # write_to_storage() will create a new AgentSession
                # and populate self.agent_session with the new session
                self.write_to_storage(user_id=self.user_id, session_id=self.session_id)  # type: ignore
                if self.agent_session is None:
                    raise Exception("Failed to create new AgentSession in storage")
                log_debug(f"-*- Created AgentSession: {self.agent_session.session_id}")
                self._log_agent_session(user_id=self.user_id, session_id=self.session_id)  # type: ignore
        return self.session_id

    def new_session(self) -> None:
        """Create a new Agent session

        - Clear the model
        - Clear the memory
        - Create a new session_id
        - Load the new session
        """
        self.agent_session = None
        if self.memory is not None:
            if isinstance(self.memory, AgentMemory):
                self.memory.clear()
            elif isinstance(self.memory, Memory):
                self.memory.clear()
        self.session_id = str(uuid4())
        self.load_session(force=True)

    def format_message_with_state_variables(self, message: Any) -> Any:
        """Format a message with the session state variables."""
        import re
        import string

        if not isinstance(message, str):
            return message

        format_variables = ChainMap(
            self.session_state or {},
            self.team_session_state or {},
            self.workflow_session_state or {},
            self.context or {},
            self.extra_data or {},
            {"user_id": self.user_id} if self.user_id is not None else {},
        )
        converted_msg = message
        for var_name in format_variables.keys():
            # Only convert standalone {var_name} patterns, not nested ones
            pattern = r"\{" + re.escape(var_name) + r"\}"
            replacement = "${" + var_name + "}"
            converted_msg = re.sub(pattern, replacement, converted_msg)

        # Use Template to safely substitute variables
        template = string.Template(converted_msg)
        try:
            result = template.safe_substitute(format_variables)
            return result
        except Exception as e:
            log_warning(f"Template substitution failed: {e}")
            return message

    def get_system_message(self, session_id: str, user_id: Optional[str] = None) -> Optional[Message]:
        """Return the system message for the Agent.

        1. If the system_message is provided, use that.
        2. If create_default_system_message is False, return None.
        3. Build and return the default system message for the Agent.
        """

        # 1. If the system_message is provided, use that.
        if self.system_message is not None:
            if isinstance(self.system_message, Message):
                return self.system_message

            sys_message_content: str = ""
            if isinstance(self.system_message, str):
                sys_message_content = self.system_message
            elif callable(self.system_message):
                sys_message_content = self.system_message(agent=self)
                if not isinstance(sys_message_content, str):
                    raise Exception("system_message must return a string")

            # Format the system message with the session state variables
            if self.add_state_in_messages:
                sys_message_content = self.format_message_with_state_variables(sys_message_content)

            # Add the JSON output prompt if response_model is provided and the model does not support native structured outputs or JSON schema outputs
            # or if use_json_mode is True
            if (
                self.model is not None
                and self.parser_model is None
                and self.response_model is not None
                and not (
                    (self.model.supports_native_structured_outputs or self.model.supports_json_schema_outputs)
                    and (not self.use_json_mode or self.structured_outputs is True)
                )
            ):
                sys_message_content += f"\n{get_json_output_prompt(self.response_model)}"  # type: ignore

            # type: ignore
            return Message(role=self.system_message_role, content=sys_message_content)
        # 2. If create_default_system_message is False, return None.
        if not self.create_default_system_message:
            return None

        if self.model is None:
            raise Exception("model not set")

        # 3. Build and return the default system message for the Agent.
        # 3.1 Build the list of instructions for the system message
        instructions: List[str] = []
        if self.instructions is not None:
            _instructions = self.instructions
            if callable(self.instructions):
                import inspect

                signature = inspect.signature(self.instructions)
                if "agent" in signature.parameters:
                    _instructions = self.instructions(agent=self)
                else:
                    _instructions = self.instructions()

            if isinstance(_instructions, str):
                instructions.append(_instructions)
            elif isinstance(_instructions, list):
                instructions.extend(_instructions)

        # 3.1.1 Add instructions from the Model
        _model_instructions = self.model.get_instructions_for_model(self._tools_for_model)
        if _model_instructions is not None:
            instructions.extend(_model_instructions)

        # 3.2 Build a list of additional information for the system message
        additional_information: List[str] = []
        # 3.2.1 Add instructions for using markdown
        if self.markdown and self.response_model is None:
            additional_information.append("Use markdown to format your answers.")
        # 3.2.2 Add the current datetime
        if self.add_datetime_to_instructions:
            from datetime import datetime

            tz = None

            if self.timezone_identifier:
                try:
                    from zoneinfo import ZoneInfo

                    tz = ZoneInfo(self.timezone_identifier)
                except Exception:
                    log_warning("Invalid timezone identifier")

            time = datetime.now(tz) if tz else datetime.now()

            additional_information.append(f"The current time is {time}.")

        # 3.2.3 Add the current location
        if self.add_location_to_instructions:
            from agno.utils.location import get_location

            location = get_location()
            if location:
                location_str = ", ".join(
                    filter(None, [location.get("city"), location.get("region"), location.get("country")])
                )
                if location_str:
                    additional_information.append(f"Your approximate location is: {location_str}.")

        # 3.2.4 Add agent name if provided
        if self.name is not None and self.add_name_to_instructions:
            additional_information.append(f"Your name is: {self.name}.")

        # 3.2.5 Add information about agentic filters if enabled
        if self.knowledge is not None and self.enable_agentic_knowledge_filters:
            valid_filters = getattr(self.knowledge, "valid_metadata_filters", None)
            if valid_filters:
                valid_filters_str = ", ".join(valid_filters)
                additional_information.append(
                    dedent(f"""
                    The knowledge base contains documents with these metadata filters: {valid_filters_str}.
                    Always use filters when the user query indicates specific metadata.

                    Examples:
                    1. If the user asks about a specific person like "Jordan Mitchell", you MUST use the search_knowledge_base tool with the filters parameter set to {{'<valid key like user_id>': '<valid value based on the user query>'}}.
                    2. If the user asks about a specific document type like "contracts", you MUST use the search_knowledge_base tool with the filters parameter set to {{'document_type': 'contract'}}.
                    4. If the user asks about a specific location like "documents from New York", you MUST use the search_knowledge_base tool with the filters parameter set to {{'<valid key like location>': 'New York'}}.

                    General Guidelines:
                    - Always analyze the user query to identify relevant metadata.
                    - Use the most specific filter(s) possible to narrow down results.
                    - If multiple filters are relevant, combine them in the filters parameter (e.g., {{'name': 'Jordan Mitchell', 'document_type': 'contract'}}).
                    - Ensure the filter keys match the valid metadata filters: {valid_filters_str}.

                    You can use the search_knowledge_base tool to search the knowledge base and get the most relevant documents. Make sure to pass the filters as [Dict[str: Any]] to the tool. FOLLOW THIS STRUCTURE STRICTLY.
                """)
                )

        # 3.3 Build the default system message for the Agent.
        system_message_content: str = ""
        # 3.3.1 First add the Agent description if provided
        if self.description is not None:
            system_message_content += f"{self.description}\n"
        # 3.3.2 Then add the Agent goal if provided
        if self.goal is not None:
            system_message_content += f"\n<your_goal>\n{self.goal}\n</your_goal>\n\n"
        # 3.3.3 Then add the Agent role if provided
        if self.role is not None:
            system_message_content += f"\n<your_role>\n{self.role}\n</your_role>\n\n"
        # 3.3.4 Then add instructions for transferring tasks to team members
        if self.has_team and self.add_transfer_instructions:
            system_message_content += (
                "<agent_team>\n"
                "You are the leader of a team of AI Agents:\n"
                "- You can either respond directly or transfer tasks to other Agents in your team depending on the tools available to them.\n"
                "- If you transfer a task to another Agent, make sure to include:\n"
                "  - task_description (str): A clear description of the task.\n"
                "  - expected_output (str): The expected output.\n"
                "  - additional_information (str): Additional information that will help the Agent complete the task.\n"
                "- You must always validate the output of the other Agents before responding to the user.\n"
                "- You can re-assign the task if you are not satisfied with the result.\n"
                "</agent_team>\n\n"
            )
        # 3.3.5 Then add instructions for the Agent
        if len(instructions) > 0:
            system_message_content += "<instructions>"
            if len(instructions) > 1:
                for _upi in instructions:
                    system_message_content += f"\n- {_upi}"
            else:
                system_message_content += "\n" + instructions[0]
            system_message_content += "\n</instructions>\n\n"
        # 3.3.6 Add additional information
        if len(additional_information) > 0:
            system_message_content += "<additional_information>"
            for _ai in additional_information:
                system_message_content += f"\n- {_ai}"
            system_message_content += "\n</additional_information>\n\n"
        # 3.3.7 Then add instructions for the tools
        if self._tool_instructions is not None:
            for _ti in self._tool_instructions:
                system_message_content += f"{_ti}\n"

        # Format the system message with the session state variables
        if self.add_state_in_messages:
            system_message_content = self.format_message_with_state_variables(system_message_content)

        # 3.3.7 Then add the expected output
        if self.expected_output is not None:
            system_message_content += f"<expected_output>\n{self.expected_output.strip()}\n</expected_output>\n\n"
        # 3.3.8 Then add additional context
        if self.additional_context is not None:
            system_message_content += f"{self.additional_context}\n"
        # 3.3.9 Then add information about the team members
        if self.has_team and self.add_transfer_instructions:
            system_message_content += (
                f"<transfer_instructions>\n{self.get_transfer_instructions().strip()}\n</transfer_instructions>\n\n"
            )
        if self.success_criteria:
            system_message_content += "Your task is successful when the following criteria is met:\n"
            system_message_content += "<success_criteria>\n"
            system_message_content += f"{self.success_criteria}\n"
            system_message_content += "</success_criteria>\n"
            system_message_content += "Stop running when the success_criteria is met.\n\n"
        # 3.3.10 Then add memories to the system prompt
        if self.memory:
            if isinstance(self.memory, AgentMemory) and self.memory.create_user_memories:
                if self.memory.memories and len(self.memory.memories) > 0:
                    system_message_content += (
                        "You have access to memories from previous interactions with the user that you can use:\n\n"
                    )
                    system_message_content += "<memories_from_previous_interactions>"
                    for _memory in self.memory.memories:
                        system_message_content += f"\n- {_memory.memory}"
                    system_message_content += "\n</memories_from_previous_interactions>\n\n"
                    system_message_content += (
                        "Note: this information is from previous interactions and may be updated in this conversation. "
                        "You should always prefer information from this conversation over the past memories.\n\n"
                    )
                else:
                    system_message_content += (
                        "You have the capability to retain memories from previous interactions with the user, "
                        "but have not had any interactions with the user yet.\n"
                    )
                system_message_content += (
                    "You can add new memories using the `update_memory` tool.\n"
                    "If you use the `update_memory` tool, remember to pass on the response to the user.\n\n"
                )
            elif isinstance(self.memory, Memory) and self.add_memory_references:
                if not user_id:
                    user_id = "default"
                user_memories = self.memory.get_user_memories(user_id=user_id)  # type: ignore
                if user_memories and len(user_memories) > 0:
                    system_message_content += (
                        "You have access to memories from previous interactions with the user that you can use:\n\n"
                    )
                    system_message_content += "<memories_from_previous_interactions>"
                    for _memory in user_memories:  # type: ignore
                        system_message_content += f"\n- {_memory.memory}"
                    system_message_content += "\n</memories_from_previous_interactions>\n\n"
                    system_message_content += (
                        "Note: this information is from previous interactions and may be updated in this conversation. "
                        "You should always prefer information from this conversation over the past memories.\n"
                    )
                else:
                    system_message_content += (
                        "You have the capability to retain memories from previous interactions with the user, "
                        "but have not had any interactions with the user yet.\n"
                    )

                if self.enable_agentic_memory:
                    system_message_content += (
                        "\n<updating_user_memories>\n"
                        "- You have access to the `update_user_memory` tool that you can use to add new memories, update existing memories, delete memories, or clear all memories.\n"
                        "- If the user's message includes information that should be captured as a memory, use the `update_user_memory` tool to update your memory database.\n"
                        "- Memories should include details that could personalize ongoing interactions with the user.\n"
                        "- Use this tool to add new memories or update existing memories that you identify in the conversation.\n"
                        "- Use this tool if the user asks to update their memory, delete a memory, or clear all memories.\n"
                        "- If you use the `update_user_memory` tool, remember to pass on the response to the user.\n"
                        "</updating_user_memories>\n\n"
                    )

            # 3.3.11 Then add a summary of the interaction to the system prompt
            if isinstance(self.memory, AgentMemory) and self.memory.create_session_summary:
                if self.memory.summary is not None:
                    system_message_content += "Here is a brief summary of your previous interactions:\n\n"
                    system_message_content += "<summary_of_previous_interactions>\n"
                    system_message_content += str(self.memory.summary)
                    system_message_content += "\n</summary_of_previous_interactions>\n\n"
                    system_message_content += (
                        "Note: this information is from previous interactions and may be outdated. "
                        "You should ALWAYS prefer information from this conversation over the past summary.\n\n"
                    )
            elif isinstance(self.memory, Memory) and self.add_session_summary_references:
                if not user_id:
                    user_id = "default"
                session_summary: SessionSummary = self.memory.summaries.get(user_id, {}).get(session_id, None)  # type: ignore
                if session_summary is not None:
                    system_message_content += "Here is a brief summary of your previous interactions:\n\n"
                    system_message_content += "<summary_of_previous_interactions>\n"
                    system_message_content += session_summary.summary
                    system_message_content += "\n</summary_of_previous_interactions>\n\n"
                    system_message_content += (
                        "Note: this information is from previous interactions and may be outdated. "
                        "You should ALWAYS prefer information from this conversation over the past summary.\n\n"
                    )

        # 3.3.12 Add the system message from the Model
        system_message_from_model = self.model.get_system_message_for_model(self._tools_for_model)
        if system_message_from_model is not None:
            system_message_content += system_message_from_model

        # 3.3.13 Add the JSON output prompt if response_model is provided and the model does not support native structured outputs or JSON schema outputs
        # or if use_json_mode is True
        if (
            self.response_model is not None
            and self.parser_model is None
            and not (
                (self.model.supports_native_structured_outputs or self.model.supports_json_schema_outputs)
                and (not self.use_json_mode or self.structured_outputs is True)
            )
        ):
            system_message_content += f"{get_json_output_prompt(self.response_model)}"  # type: ignore

        # 3.3.14 Add the response model format prompt if response_model is provided
        if self.response_model is not None and self.parser_model is not None:
            system_message_content += f"{get_response_model_format_prompt(self.response_model)}"

        # Return the system message
        return (
            Message(role=self.system_message_role, content=system_message_content.strip())  # type: ignore
            if system_message_content
            else None
        )

    def get_user_message(
        self,
        *,
        message: Optional[Union[str, List, Dict, Message, BaseModel]] = None,
        audio: Optional[Sequence[Audio]] = None,
        images: Optional[Sequence[Image]] = None,
        videos: Optional[Sequence[Video]] = None,
        files: Optional[Sequence[File]] = None,
        knowledge_filters: Optional[Dict[str, Any]] = None,
        **kwargs: Any,
    ) -> Optional[Message]:
        """Return the user message for the Agent.

        1. If the user_message is provided, use that.
        2. If create_default_user_message is False or if the message is a list, return the message as is.
        3. Build the default user message for the Agent
        """
        # Get references from the knowledge base to use in the user message
        references = None
        self.run_response = cast(RunResponse, self.run_response)
        if self.add_references and message:
            message_str: str
            if isinstance(message, str):
                message_str = message
            elif callable(message):
                message_str = message(agent=self)
            else:
                raise Exception("message must be a string or a callable when add_references is True")

            try:
                retrieval_timer = Timer()
                retrieval_timer.start()
                docs_from_knowledge = self.get_relevant_docs_from_knowledge(
                    query=message_str, filters=knowledge_filters, **kwargs
                )
                if docs_from_knowledge is not None:
                    references = MessageReferences(
                        query=message_str, references=docs_from_knowledge, time=round(retrieval_timer.elapsed, 4)
                    )
                    # Add the references to the run_response
                    if self.run_response.extra_data is None:
                        self.run_response.extra_data = RunResponseExtraData()
                    if self.run_response.extra_data.references is None:
                        self.run_response.extra_data.references = []
                    self.run_response.extra_data.references.append(references)
                retrieval_timer.stop()
                log_debug(f"Time to get references: {retrieval_timer.elapsed:.4f}s")
            except Exception as e:
                log_warning(f"Failed to get references: {e}")

        # 1. If the user_message is provided, use that.
        if self.user_message is not None:
            if isinstance(self.user_message, Message):
                return self.user_message

            user_message_content = self.user_message
            if callable(self.user_message):
                user_message_kwargs = {"agent": self, "message": message, "references": references}
                user_message_content = self.user_message(**user_message_kwargs)
                if not isinstance(user_message_content, str):
                    raise Exception("user_message must return a string")

            if self.add_state_in_messages:
                user_message_content = self.format_message_with_state_variables(user_message_content)

            return Message(
                role=self.user_message_role,
                content=user_message_content,
                audio=audio,
                images=images,
                videos=videos,
                files=files,
                **kwargs,
            )

        # 2. If create_default_user_message is False or message is a list, return the message as is.
        if not self.create_default_user_message:
            return Message(
                role=self.user_message_role,
                content=message,
                images=images,
                audio=audio,
                videos=videos,
                files=files,
                **kwargs,
            )

        # Handle list messages by converting to string
        if isinstance(message, list):
            # Convert list to string (join with newlines if all elements are strings)
            if all(isinstance(item, str) for item in message):
                message_content = "\n".join(message)
            else:
                message_content = str(message)

            return Message(
                role=self.user_message_role,
                content=message_content,
                images=images,
                audio=audio,
                videos=videos,
                files=files,
                **kwargs,
            )

        # 3. Build the default user message for the Agent
        if message is None:
            # If we have any media, return a message with empty content
            if images is not None or audio is not None or videos is not None or files is not None:
                return Message(
                    role=self.user_message_role,
                    content="",
                    images=images,
                    audio=audio,
                    videos=videos,
                    files=files,
                    **kwargs,
                )
            else:
                # If the message is None, return None
                return None

        user_msg_content = message
        # Format the message with the session state variables
        if self.add_state_in_messages:
            user_msg_content = self.format_message_with_state_variables(message)

        # Convert to string for concatenation operations
        user_msg_content_str = get_text_from_message(user_msg_content) if user_msg_content is not None else ""

        # 4.1 Add references to user message
        if (
            self.add_references
            and references is not None
            and references.references is not None
            and len(references.references) > 0
        ):
            user_msg_content_str += "\n\nUse the following references from the knowledge base if it helps:\n"
            user_msg_content_str += "<references>\n"
            user_msg_content_str += self.convert_documents_to_string(references.references) + "\n"
            user_msg_content_str += "</references>"
        # 4.2 Add context to user message
        if self.add_context and self.context is not None:
            user_msg_content_str += "\n\n<context>\n"
            user_msg_content_str += self.convert_context_to_string(self.context) + "\n"
            user_msg_content_str += "</context>"

        # Use the string version for the final content
        user_msg_content = user_msg_content_str

        # Return the user message
        return Message(
            role=self.user_message_role,
            content=user_msg_content,
            audio=audio,
            images=images,
            videos=videos,
            files=files,
            **kwargs,
        )

    def get_run_messages(
        self,
        *,
        message: Optional[Union[str, List, Dict, Message, BaseModel]] = None,
        session_id: str,
        user_id: Optional[str] = None,
        audio: Optional[Sequence[Audio]] = None,
        images: Optional[Sequence[Image]] = None,
        videos: Optional[Sequence[Video]] = None,
        files: Optional[Sequence[File]] = None,
        messages: Optional[Sequence[Union[Dict, Message]]] = None,
        knowledge_filters: Optional[Dict[str, Any]] = None,
        **kwargs: Any,
    ) -> RunMessages:
        """This function returns a RunMessages object with the following attributes:
            - system_message: The system message for this run
            - user_message: The user message for this run
            - messages: List of messages to send to the model

        To build the RunMessages object:
        1. Add system message to run_messages
        2. Add extra messages to run_messages if provided
        3. Add history to run_messages
        4. Add user message to run_messages
        5. Add messages to run_messages if provided

        Returns:
            RunMessages object with the following attributes:
                - system_message: The system message for this run
                - user_message: The user message for this run
                - messages: List of all messages to send to the model

        Typical usage:
        run_messages = self.get_run_messages(
            message=message, session_id=session_id, user_id=user_id, audio=audio, images=images, videos=videos, files=files, messages=messages, **kwargs
        )
        """

        # Initialize the RunMessages object
        run_messages = RunMessages()
        self.run_response = cast(RunResponse, self.run_response)

        # 1. Add system message to run_messages
        system_message = self.get_system_message(session_id=session_id, user_id=user_id)
        if system_message is not None:
            run_messages.system_message = system_message
            run_messages.messages.append(system_message)

        # 2. Add extra messages to run_messages if provided
        if self.add_messages is not None:
            messages_to_add_to_run_response: List[Message] = []
            if run_messages.extra_messages is None:
                run_messages.extra_messages = []

            for _m in self.add_messages:
                if isinstance(_m, Message):
                    messages_to_add_to_run_response.append(_m)
                    run_messages.messages.append(_m)
                    run_messages.extra_messages.append(_m)
                elif isinstance(_m, dict):
                    try:
                        _m_parsed = Message.model_validate(_m)
                        messages_to_add_to_run_response.append(_m_parsed)
                        run_messages.messages.append(_m_parsed)
                        run_messages.extra_messages.append(_m_parsed)
                    except Exception as e:
                        log_warning(f"Failed to validate message: {e}")
            # Add the extra messages to the run_response
            if len(messages_to_add_to_run_response) > 0:
                log_debug(f"Adding {len(messages_to_add_to_run_response)} extra messages")
                if self.run_response.extra_data is None:
                    self.run_response.extra_data = RunResponseExtraData(add_messages=messages_to_add_to_run_response)
                else:
                    if self.run_response.extra_data.add_messages is None:
                        self.run_response.extra_data.add_messages = messages_to_add_to_run_response
                    else:
                        self.run_response.extra_data.add_messages.extend(messages_to_add_to_run_response)

        # 3. Add history to run_messages
        if self.add_history_to_messages:
            from copy import deepcopy

            history: List[Message] = []
            if isinstance(self.memory, AgentMemory):
                history = self.memory.get_messages_from_last_n_runs(
                    last_n=self.num_history_runs, skip_role=self.system_message_role
                )
            elif isinstance(self.memory, Memory):
                history = self.memory.get_messages_from_last_n_runs(
                    session_id=session_id,
                    last_n=self.num_history_runs,
                    skip_role=self.system_message_role,
                    # Only filter by agent_id if this is part of a team
                    agent_id=self.agent_id if self.team_session_id is not None else None,
                )

            if len(history) > 0:
                # Create a deep copy of the history messages to avoid modifying the original messages
                history_copy = [deepcopy(msg) for msg in history]

                # Tag each message as coming from history
                for _msg in history_copy:
                    _msg.from_history = True

                log_debug(f"Adding {len(history_copy)} messages from history")

                run_messages.messages += history_copy

        # 4. Add messages to run_messages if provided
        if messages is not None and len(messages) > 0:
            for _m in messages:
                if isinstance(_m, Message):
                    run_messages.messages.append(_m)
                    if run_messages.extra_messages is None:
                        run_messages.extra_messages = []
                    run_messages.extra_messages.append(_m)
                elif isinstance(_m, dict):
                    try:
                        run_messages.messages.append(Message.model_validate(_m))
                        if run_messages.extra_messages is None:
                            run_messages.extra_messages = []
                        run_messages.extra_messages.append(Message.model_validate(_m))
                    except Exception as e:
                        log_warning(f"Failed to validate message: {e}")

        # 5. Add user message to run_messages
        user_message: Optional[Message] = None
        # 5.1 Build user message if message is None, str or list
        if message is None or isinstance(message, str) or isinstance(message, list):
            user_message = self.get_user_message(
                message=message,
                audio=audio,
                images=images,
                videos=videos,
                files=files,
                knowledge_filters=knowledge_filters,
                **kwargs,
            )
        # 5.2 If message is provided as a Message, use it directly
        elif isinstance(message, Message):
            user_message = message
        # 5.3 If message is provided as a dict, try to validate it as a Message
        elif isinstance(message, dict):
            try:
                user_message = Message.model_validate(message)
            except Exception as e:
                log_warning(f"Failed to validate message: {e}")
        # 5.4 If message is provided as a BaseModel, convert it to a Message
        elif isinstance(message, BaseModel):
            try:
                # Create a user message with the BaseModel content
                content = message.model_dump_json(indent=2, exclude_none=True)
                user_message = Message(role=self.user_message_role, content=content)
            except Exception as e:
                log_warning(f"Failed to convert BaseModel to message: {e}")
        # Add user message to run_messages
        if user_message is not None:
            run_messages.user_message = user_message
            run_messages.messages.append(user_message)

        return run_messages

    def get_continue_run_messages(
        self,
        messages: List[Message],
    ) -> RunMessages:
        """This function returns a RunMessages object with the following attributes:
            - system_message: The system message for this run
            - user_message: The user message for this run
            - messages: List of messages to send to the model

        It continues from a previous run and completes a tool call that was paused.
        """

        # Initialize the RunMessages object
        run_messages = RunMessages()

        # Extract most recent user message from messages as the original user message
        user_message = None
        for m in reversed(messages):
            if m.role == self.user_message_role:
                user_message = m
                break

        # Extract system message from messages
        system_message = None
        for m in messages:
            if m.role == self.system_message_role:
                system_message = m
                break

        run_messages.system_message = system_message
        run_messages.user_message = user_message
        run_messages.messages = messages

        return run_messages

    def get_messages_for_parser_model(
        self, model_response: ModelResponse, response_format: Optional[Union[Dict, Type[BaseModel]]]
    ) -> List[Message]:
        """Get the messages for the parser model."""
        system_content = (
            self.parser_model_prompt
            if self.parser_model_prompt is not None
            else "You are tasked with creating a structured output from the provided user message."
        )

        if response_format == {"type": "json_object"} and self.response_model is not None:
            system_content += f"{get_json_output_prompt(self.response_model)}"  # type: ignore

        return [
            Message(role="system", content=system_content),
            Message(role="user", content=model_response.content),
        ]

    def get_messages_for_parser_model_stream(
        self, run_response: RunResponse, response_format: Optional[Union[Dict, Type[BaseModel]]]
    ) -> List[Message]:
        """Get the messages for the parser model."""
        system_content = (
            self.parser_model_prompt
            if self.parser_model_prompt is not None
            else "You are tasked with creating a structured output from the provided data."
        )

        if response_format == {"type": "json_object"} and self.response_model is not None:
            system_content += f"{get_json_output_prompt(self.response_model)}"  # type: ignore

        return [
            Message(role="system", content=system_content),
            Message(role="user", content=run_response.content),
        ]

    def get_messages_for_output_model(self, messages: List[Message]) -> List[Message]:
        """Get the messages for the output model."""

        if self.output_model_prompt is not None:
            system_message_exists = False
            for message in messages:
                if message.role == "system":
                    system_message_exists = True
                    message.content = self.output_model_prompt
                    break
            if not system_message_exists:
                messages.insert(0, Message(role="system", content=self.output_model_prompt))

        # Remove the last assistant message from the messages list
        messages.pop(-1)

        return messages

    def get_session_summary(self, session_id: Optional[str] = None, user_id: Optional[str] = None):
        """Get the session summary for the given session ID and user ID."""
        if self.memory is None:
            return None

        session_id = session_id if session_id is not None else self.session_id
        if session_id is None:
            raise ValueError("Session ID is required")

        if isinstance(self.memory, Memory):
            user_id = user_id if user_id is not None else self.user_id
            if user_id is None:
                user_id = "default"
            return self.memory.get_session_summary(session_id=session_id, user_id=user_id)
        elif isinstance(self.memory, AgentMemory):
            return self.memory.summary
        else:
            raise ValueError(f"Memory type {type(self.memory)} not supported")

    def get_user_memories(self, user_id: Optional[str] = None) -> Optional[List[UserMemory]]:
        """Get the user memories for the given user ID."""
        if self.memory is None:
            return None
        user_id = user_id if user_id is not None else self.user_id
        if user_id is None:
            user_id = "default"

        if isinstance(self.memory, Memory):
            return self.memory.get_user_memories(user_id=user_id)
        elif isinstance(self.memory, AgentMemory):
            raise ValueError("AgentMemory does not support get_user_memories")
        else:
            raise ValueError(f"Memory type {type(self.memory)} not supported")

    def deep_copy(self, *, update: Optional[Dict[str, Any]] = None) -> Agent:
        """Create and return a deep copy of this Agent, optionally updating fields.

        Args:
            update (Optional[Dict[str, Any]]): Optional dictionary of fields for the new Agent.

        Returns:
            Agent: A new Agent instance.
        """
        from dataclasses import fields

        # Do not copy agent_session and session_name to the new agent
        excluded_fields = ["agent_session"]
        # Extract the fields to set for the new Agent
        fields_for_new_agent: Dict[str, Any] = {}

        for f in fields(self):
            if f.name in excluded_fields:
                continue
            field_value = getattr(self, f.name)
            if field_value is not None:
                fields_for_new_agent[f.name] = self._deep_copy_field(f.name, field_value)

        # Update fields if provided
        if update:
            fields_for_new_agent.update(update)
        # Create a new Agent
        new_agent = self.__class__(**fields_for_new_agent)
        log_debug(f"Created new {self.__class__.__name__}")
        return new_agent

    def _deep_copy_field(self, field_name: str, field_value: Any) -> Any:
        """Helper method to deep copy a field based on its type."""
        from copy import copy, deepcopy

        # For memory and reasoning_agent, use their deep_copy methods
        if field_name == "reasoning_agent":
            return field_value.deep_copy()

        # For storage, model and reasoning_model, use a deep copy
        elif field_name in ("memory", "storage", "model", "reasoning_model"):
            try:
                return deepcopy(field_value)
            except Exception:
                try:
                    return copy(field_value)
                except Exception as e:
                    log_warning(f"Failed to copy field: {field_name} - {e}")
                    return field_value

        # For compound types, attempt a deep copy
        elif isinstance(field_value, (list, dict, set)):
            try:
                return deepcopy(field_value)
            except Exception:
                try:
                    return copy(field_value)
                except Exception as e:
                    log_warning(f"Failed to copy field: {field_name} - {e}")
                    return field_value

        # For pydantic models, attempt a model_copy
        elif isinstance(field_value, BaseModel):
            try:
                return field_value.model_copy(deep=True)
            except Exception:
                try:
                    return field_value.model_copy(deep=False)
                except Exception as e:
                    log_warning(f"Failed to copy field: {field_name} - {e}")
                    return field_value

        # For other types, attempt a shallow copy first
        try:
            from copy import copy

            return copy(field_value)
        except Exception:
            # If copy fails, return as is
            return field_value

    def get_transfer_function(self, member_agent: Agent, index: int, session_id: Optional[str] = None) -> Function:
        def _transfer_task_to_agent(
            task_description: str, expected_output: str, additional_information: Optional[str] = None
        ) -> Iterator[str]:
            if member_agent.team_data is None:
                member_agent.team_data = {}

            # Update the member agent team_data to include leader_session_id, leader_agent_id and leader_run_id
            member_agent.team_data["leader_session_id"] = session_id
            member_agent.team_data["leader_agent_id"] = self.agent_id
            member_agent.team_data["leader_run_id"] = self.run_id

            # -*- Run the agent
            member_agent_task = f"{task_description}\n\n<expected_output>\n{expected_output}\n</expected_output>"
            try:
                if additional_information is not None and additional_information.strip() != "":
                    member_agent_task += (
                        f"\n\n<additional_information>\n{additional_information}\n</additional_information>"
                    )
            except Exception as e:
                log_warning(f"Failed to add additional information to the member agent: {e}")

            member_agent_session_id = member_agent.session_id
            member_agent_agent_id = member_agent.agent_id

            # Create a dictionary with member_session_id and member_agent_id
            member_agent_info = {
                "session_id": member_agent_session_id,
                "agent_id": member_agent_agent_id,
            }
            # Update the leader agent team_data to include member_agent_info
            if self.team_data is None:
                self.team_data = {}
            if "members" not in self.team_data:
                self.team_data["members"] = [member_agent_info]
            else:
                # Check if member_agent_info is already in the list
                if member_agent_info not in self.team_data["members"]:
                    self.team_data["members"].append(member_agent_info)

            if self.stream:
                member_agent_run_response_stream = member_agent.run(member_agent_task, stream=True)
                for member_agent_run_response_chunk in member_agent_run_response_stream:
                    yield member_agent_run_response_chunk.content  # type: ignore
            else:
                member_agent_run_response: RunResponse = member_agent.run(member_agent_task, stream=False)
                if member_agent_run_response.content is None:
                    yield "No response from the member agent."
                elif isinstance(member_agent_run_response.content, str):
                    yield member_agent_run_response.content
                elif issubclass(type(member_agent_run_response.content), BaseModel):
                    try:
                        yield member_agent_run_response.content.model_dump_json(indent=2)
                    except Exception as e:
                        yield str(e)
                else:
                    try:
                        import json

                        yield json.dumps(member_agent_run_response.content, indent=2)
                    except Exception as e:
                        yield str(e)
            yield self.team_response_separator

        # Give a name to the member agent
        agent_name = member_agent.name if member_agent.name else f"agent_{index}"
        # Convert non-ascii characters to ascii equivalents and ensure only alphanumeric, underscore and hyphen
        agent_name = "".join(c for c in agent_name if c.isalnum() or c in "_- ").strip()
        agent_name = agent_name.lower().replace(" ", "_")

        if member_agent.name is None:
            member_agent.name = agent_name

        strict = True if (member_agent.response_model is not None and member_agent.model is not None) else False
        transfer_function = Function.from_callable(_transfer_task_to_agent, strict=strict)
        transfer_function.strict = strict
        transfer_function.name = f"transfer_task_to_{agent_name}"
        transfer_function.description = dedent(f"""\
        Use this function to transfer a task to {agent_name}
        You must provide a clear and concise description of the task the agent should achieve AND the expected output.
        Args:
            task_description (str): A clear and concise description of the task the agent should achieve.
            expected_output (str): The expected output from the agent.
            additional_information (Optional[str]): Additional information that will help the agent complete the task.
        Returns:
            str: The result of the delegated task.
        """)

        # If the member agent is set to respond directly, show the result of the function call and stop the model execution
        if member_agent.respond_directly:
            transfer_function.show_result = True
            transfer_function.stop_after_tool_call = True

        return transfer_function

    def get_transfer_instructions(self) -> str:
        if self.team and len(self.team) > 0:
            transfer_instructions = "You can transfer tasks to the following Agents in your team:\n"
            for agent_index, agent in enumerate(self.team):
                transfer_instructions += f"\nAgent {agent_index + 1}:\n"
                if agent.name:
                    transfer_instructions += f"Name: {agent.name}\n"
                if agent.role:
                    transfer_instructions += f"Role: {agent.role}\n"
                if agent.tools is not None:
                    _tools = []
                    for _tool in agent.tools:
                        if isinstance(_tool, Toolkit):
                            _tools.extend(list(_tool.functions.keys()))
                        elif isinstance(_tool, Function):
                            _tools.append(_tool.name)
                        elif callable(_tool):
                            _tools.append(_tool.__name__)
                    transfer_instructions += f"Available tools: {', '.join(_tools)}\n"
            return transfer_instructions
        return ""

    def get_relevant_docs_from_knowledge(
        self, query: str, num_documents: Optional[int] = None, filters: Optional[Dict[str, Any]] = None, **kwargs
    ) -> Optional[List[Union[Dict[str, Any], str]]]:
        """Get relevant docs from the knowledge base to answer a query.

        Args:
            query (str): The query to search for.
            num_documents (Optional[int]): Number of documents to return.
            filters (Optional[Dict[str, Any]]): Filters to apply to the search.
            **kwargs: Additional keyword arguments.

        Returns:
            Optional[List[Dict[str, Any]]]: List of relevant document dicts.
        """
        from agno.document import Document

        if num_documents is None and self.knowledge is not None:
            num_documents = self.knowledge.num_documents
        # Validate the filters against known valid filter keys
        if self.knowledge is not None:
            valid_filters, invalid_keys = self.knowledge.validate_filters(filters)  # type: ignore

            # Warn about invalid filter keys
            if invalid_keys:
                # type: ignore
                log_warning(f"Invalid filter keys provided: {invalid_keys}. These filters will be ignored.")
                log_info(f"Valid filter keys are: {self.knowledge.valid_metadata_filters}")  # type: ignore

                # Only use valid filters
                filters = valid_filters
                if not filters:
                    log_warning("No valid filters remain after validation. Search will proceed without filters.")

        if self.retriever is not None and callable(self.retriever):
            from inspect import signature

            try:
                sig = signature(self.retriever)
                retriever_kwargs: Dict[str, Any] = {}
                if "agent" in sig.parameters:
                    retriever_kwargs = {"agent": self}
                if "filters" in sig.parameters:
                    retriever_kwargs["filters"] = filters
                retriever_kwargs.update({"query": query, "num_documents": num_documents, **kwargs})
                return self.retriever(**retriever_kwargs)
            except Exception as e:
                log_warning(f"Retriever failed: {e}")
                raise e

        # Use knowledge base search
        try:
            if self.knowledge is None or (
                getattr(self.knowledge, "vector_db", None) is None
                and getattr(self.knowledge, "retriever", None) is None
            ):
                return None

            log_debug(f"Searching knowledge base with filters: {filters}")
            relevant_docs: List[Document] = self.knowledge.search(
                query=query, num_documents=num_documents, filters=filters
            )

            if not relevant_docs or len(relevant_docs) == 0:
                log_debug("No relevant documents found for query")
                return None

            return [doc.to_dict() for doc in relevant_docs]
        except Exception as e:
            log_warning(f"Error searching knowledge base: {e}")
            raise e

    async def aget_relevant_docs_from_knowledge(
        self, query: str, num_documents: Optional[int] = None, filters: Optional[Dict[str, Any]] = None, **kwargs
    ) -> Optional[List[Union[Dict[str, Any], str]]]:
        """Get relevant documents from knowledge base asynchronously."""
        from agno.document import Document

        if num_documents is None and self.knowledge is not None:
            num_documents = self.knowledge.num_documents

        # Validate the filters against known valid filter keys
        if self.knowledge is not None:
            valid_filters, invalid_keys = self.knowledge.validate_filters(filters)  # type: ignore

            # Warn about invalid filter keys
            if invalid_keys:  # type: ignore
                log_warning(f"Invalid filter keys provided: {invalid_keys}. These filters will be ignored.")
                log_info(f"Valid filter keys are: {self.knowledge.valid_metadata_filters}")  # type: ignore

                # Only use valid filters
                filters = valid_filters
                if not filters:
                    log_warning("No valid filters remain after validation. Search will proceed without filters.")

        if self.retriever is not None and callable(self.retriever):
            from inspect import isawaitable, signature

            try:
                sig = signature(self.retriever)
                retriever_kwargs: Dict[str, Any] = {}
                if "agent" in sig.parameters:
                    retriever_kwargs = {"agent": self}
                if "filters" in sig.parameters:
                    retriever_kwargs["filters"] = filters
                retriever_kwargs.update({"query": query, "num_documents": num_documents, **kwargs})
                result = self.retriever(**retriever_kwargs)

                if isawaitable(result):
                    result = await result

                return result
            except Exception as e:
                log_warning(f"Retriever failed: {e}")
                raise e

        # Use knowledge base search
        try:
            if self.knowledge is None or (
                getattr(self.knowledge, "vector_db", None) is None
                and getattr(self.knowledge, "retriever", None) is None
            ):
                return None

            log_debug(f"Searching knowledge base with filters: {filters}")
            relevant_docs: List[Document] = await self.knowledge.async_search(
                query=query, num_documents=num_documents, filters=filters
            )

            if not relevant_docs or len(relevant_docs) == 0:
                log_debug("No relevant documents found for query")
                return None

            return [doc.to_dict() for doc in relevant_docs]
        except Exception as e:
            log_warning(f"Error searching knowledge base: {e}")
            raise e

    def convert_documents_to_string(self, docs: List[Union[Dict[str, Any], str]]) -> str:
        if docs is None or len(docs) == 0:
            return ""

        if self.references_format == "yaml":
            import yaml

            return yaml.dump(docs)

        import json

        return json.dumps(docs, indent=2, ensure_ascii=False)

    def convert_context_to_string(self, context: Dict[str, Any]) -> str:
        """Convert the context dictionary to a string representation.

        Args:
            context: Dictionary containing context data

        Returns:
            String representation of the context, or empty string if conversion fails
        """
        if context is None:
            return ""

        import json

        try:
            return json.dumps(context, indent=2, default=str)
        except (TypeError, ValueError, OverflowError) as e:
            log_warning(f"Failed to convert context to JSON: {e}")
            # Attempt a fallback conversion for non-serializable objects
            sanitized_context = {}
            for key, value in context.items():
                try:
                    # Try to serialize each value individually
                    json.dumps({key: value}, default=str)
                    sanitized_context[key] = value
                except Exception:
                    # If serialization fails, convert to string representation
                    sanitized_context[key] = str(value)

            try:
                return json.dumps(sanitized_context, indent=2)
            except Exception as e:
                log_error(f"Failed to convert sanitized context to JSON: {e}")
                return str(context)

    def save_run_response_to_file(
        self, message: Optional[Union[str, List, Dict, Message]] = None, session_id: Optional[str] = None
    ) -> None:
        if self.save_response_to_file is not None and self.run_response is not None:
            message_str = None
            if message is not None:
                if isinstance(message, str):
                    message_str = message
                else:
                    log_warning("Did not use message in output file name: message is not a string")
            try:
                from pathlib import Path

                fn = self.save_response_to_file.format(
                    name=self.name,
                    session_id=session_id,
                    user_id=self.user_id,
                    message=message_str,
                    run_id=self.run_id,
                )
                fn_path = Path(fn)
                if not fn_path.parent.exists():
                    fn_path.parent.mkdir(parents=True, exist_ok=True)
                if isinstance(self.run_response.content, str):
                    fn_path.write_text(self.run_response.content)
                else:
                    import json

                    fn_path.write_text(json.dumps(self.run_response.content, indent=2))
            except Exception as e:
                log_warning(f"Failed to save output to file: {e}")

    def update_run_response_with_reasoning(
        self, reasoning_steps: List[ReasoningStep], reasoning_agent_messages: List[Message]
    ) -> None:
        self.run_response = cast(RunResponse, self.run_response)
        if self.run_response.extra_data is None:
            self.run_response.extra_data = RunResponseExtraData()

        extra_data = self.run_response.extra_data

        # Update reasoning_steps
        if extra_data.reasoning_steps is None:
            extra_data.reasoning_steps = reasoning_steps
        else:
            extra_data.reasoning_steps.extend(reasoning_steps)

        # Update reasoning_messages
        if extra_data.reasoning_messages is None:
            extra_data.reasoning_messages = reasoning_agent_messages
        else:
            extra_data.reasoning_messages.extend(reasoning_agent_messages)

        # Create and store reasoning_content
        reasoning_content = ""
        for step in reasoning_steps:
            if step.title:
                reasoning_content += f"## {step.title}\n"
            if step.reasoning:
                reasoning_content += f"{step.reasoning}\n"
            if step.action:
                reasoning_content += f"Action: {step.action}\n"
            if step.result:
                reasoning_content += f"Result: {step.result}\n"
            reasoning_content += "\n"

        # Add to existing reasoning_content or set it
        if not self.run_response.reasoning_content:
            self.run_response.reasoning_content = reasoning_content
        else:
            self.run_response.reasoning_content += reasoning_content

    def aggregate_metrics_from_messages(self, messages: List[Message]) -> Dict[str, Any]:
        aggregated_metrics: Dict[str, Any] = defaultdict(list)
        assistant_message_role = self.model.assistant_message_role if self.model is not None else "assistant"
        for m in messages:
            if m.role == assistant_message_role and m.metrics is not None and m.from_history is False:
                for k, v in asdict(m.metrics).items():
                    if k == "timer":
                        continue
                    if v is not None:
                        aggregated_metrics[k].append(v)
        if aggregated_metrics is not None:
            aggregated_metrics = dict(aggregated_metrics)
        return aggregated_metrics

    def calculate_metrics(self, messages: List[Message]) -> SessionMetrics:
        session_metrics = SessionMetrics()
        assistant_message_role = self.model.assistant_message_role if self.model is not None else "assistant"
        for m in messages:
            if m.role == assistant_message_role and m.metrics is not None and m.from_history is False:
                session_metrics += m.metrics
        return session_metrics

    def rename(self, name: str, session_id: Optional[str] = None) -> None:
        """Rename the Agent and save to storage"""

        if self.session_id is None and session_id is None:
            raise Exception("Session ID is not set")

        session_id = session_id or self.session_id

        # -*- Read from storage
        self.read_from_storage(session_id=session_id)  # type: ignore
        # -*- Rename Agent
        self.name = name
        # -*- Save to storage
        self.write_to_storage(user_id=self.user_id, session_id=session_id)  # type: ignore
        # -*- Log Agent session
        self._log_agent_session(user_id=self.user_id, session_id=session_id)  # type: ignore

    def rename_session(self, session_name: str, session_id: Optional[str] = None) -> None:
        """Rename the current session and save to storage"""

        if self.session_id is None and session_id is None:
            raise Exception("Session ID is not set")

        session_id = session_id or self.session_id

        # -*- Read from storage
        self.read_from_storage(session_id=session_id)  # type: ignore
        # -*- Rename session
        self.session_name = session_name
        # -*- Save to storage
        self.write_to_storage(user_id=self.user_id, session_id=session_id)  # type: ignore
        # -*- Log Agent session
        self._log_agent_session(user_id=self.user_id, session_id=session_id)  # type: ignore

    def generate_session_name(self, session_id: str) -> str:
        """Generate a name for the session using the first 6 messages from the memory"""

        if self.model is None:
            raise Exception("Model not set")

        gen_session_name_prompt = "Conversation\n"
        messages_for_generating_session_name = []
        if isinstance(self.memory, AgentMemory):
            try:
                message_pairs = self.memory.get_message_pairs()
                for message_pair in message_pairs[:3]:
                    messages_for_generating_session_name.append(message_pair[0])
                    messages_for_generating_session_name.append(message_pair[1])
            except Exception as e:
                log_warning(f"Failed to generate name: {e}")
        elif isinstance(self.memory, Memory):
            messages_for_generating_session_name = self.memory.get_messages_for_session(session_id=session_id)

        for message in messages_for_generating_session_name:
            gen_session_name_prompt += f"{message.role.upper()}: {message.content}\n"

        gen_session_name_prompt += "\n\nConversation Name: "

        system_message = Message(
            role=self.system_message_role,
            content="Please provide a suitable name for this conversation in maximum 5 words. "
            "Remember, do not exceed 5 words.",
        )
        user_message = Message(role=self.user_message_role, content=gen_session_name_prompt)
        generate_name_messages = [system_message, user_message]
        generated_name = self.model.response(messages=generate_name_messages)
        content = generated_name.content
        if content is None:
            log_error("Generated name is None. Trying again.")
            return self.generate_session_name(session_id=session_id)
        if len(content.split()) > 15:
            log_error("Generated name is too long. Trying again.")
            return self.generate_session_name(session_id=session_id)
        return content.replace('"', "").strip()

    def auto_rename_session(self) -> None:
        """Automatically rename the session and save to storage"""

        if self.session_id is None:
            raise Exception("Session ID is not set")

        # -*- Read from storage
        self.read_from_storage(session_id=self.session_id)  # type: ignore
        # -*- Generate name for session
        generated_session_name = self.generate_session_name(session_id=self.session_id)
        log_debug(f"Generated Session Name: {generated_session_name}")
        # -*- Rename thread
        self.session_name = generated_session_name
        # -*- Save to storage
        self.write_to_storage(user_id=self.user_id, session_id=self.session_id)  # type: ignore
        # -*- Log Agent Session
        self._log_agent_session(user_id=self.user_id, session_id=self.session_id)  # type: ignore

    def delete_session(self, session_id: str):
        """Delete the current session and save to storage"""
        if self.storage is None:
            return
        # -*- Delete session
        self.storage.delete_session(session_id=session_id)

    def get_messages_for_session(self, session_id: Optional[str] = None) -> List[Message]:
        """Get messages for a session"""
        _session_id = session_id or self.session_id
        if _session_id is None:
            log_warning("Session ID is not set, cannot get messages for session")
            return []

        if self.memory is None:
            self.read_from_storage(session_id=_session_id)

        if self.memory is None:
            return []

        if isinstance(self.memory, AgentMemory):
            return self.memory.messages
        elif isinstance(self.memory, Memory):
            return self.memory.get_messages_from_last_n_runs(
                session_id=_session_id,
                # Only filter by agent_id if this is part of a team
                agent_id=self.agent_id if self.team_session_id is not None else None,
            )
        else:
            return []

    ###########################################################################
    # Handle images, videos and audio
    ###########################################################################

    def add_image(self, image: ImageArtifact) -> None:
        if self.images is None:
            self.images = []
        self.images.append(image)
        if self.run_response is not None:
            if self.run_response.images is None:
                self.run_response.images = []
            self.run_response.images.append(image)

    def add_video(self, video: VideoArtifact) -> None:
        if self.videos is None:
            self.videos = []
        self.videos.append(video)
        if self.run_response is not None:
            if self.run_response.videos is None:
                self.run_response.videos = []
            self.run_response.videos.append(video)

    def add_audio(self, audio: AudioArtifact) -> None:
        if self.audio is None:
            self.audio = []
        self.audio.append(audio)
        if self.run_response is not None:
            if self.run_response.audio is None:
                self.run_response.audio = []
            self.run_response.audio.append(audio)

    def get_images(self) -> Optional[List[ImageArtifact]]:
        return self.images

    def get_videos(self) -> Optional[List[VideoArtifact]]:
        return self.videos

    def get_audio(self) -> Optional[List[AudioArtifact]]:
        return self.audio

    ###########################################################################
    # Reasoning
    ###########################################################################

    def _handle_reasoning(self, run_messages: RunMessages) -> None:
        if self.reasoning or self.reasoning_model is not None:
            reasoning_generator = self.reason(run_messages=run_messages)

            # Consume the generator without yielding
            deque(reasoning_generator, maxlen=0)

    def _handle_reasoning_stream(self, run_messages: RunMessages) -> Iterator[RunResponseEvent]:
        if self.reasoning or self.reasoning_model is not None:
            reasoning_generator = self.reason(run_messages=run_messages)
            yield from reasoning_generator

    async def _ahandle_reasoning(self, run_messages: RunMessages) -> None:
        if self.reasoning or self.reasoning_model is not None:
            reason_generator = self.areason(run_messages=run_messages)
            # Consume the generator without yielding
            async for _ in reason_generator:
                pass

    async def _ahandle_reasoning_stream(self, run_messages: RunMessages) -> AsyncIterator[RunResponseEvent]:
        if self.reasoning or self.reasoning_model is not None:
            reason_generator = self.areason(run_messages=run_messages)
            async for item in reason_generator:
                yield item

    def _format_reasoning_step_content(self, reasoning_step: ReasoningStep) -> str:
        """Format content for a reasoning step without changing any existing logic."""
        step_content = ""
        if reasoning_step.title:
            step_content += f"## {reasoning_step.title}\n"
        if reasoning_step.reasoning:
            step_content += f"{reasoning_step.reasoning}\n"
        if reasoning_step.action:
            step_content += f"Action: {reasoning_step.action}\n"
        if reasoning_step.result:
            step_content += f"Result: {reasoning_step.result}\n"
        step_content += "\n"

        # Get the current reasoning_content and append this step
        current_reasoning_content = ""
        if hasattr(self.run_response, "reasoning_content") and self.run_response.reasoning_content:  # type: ignore
            current_reasoning_content = self.run_response.reasoning_content  # type: ignore

        # Create updated reasoning_content
        updated_reasoning_content = current_reasoning_content + step_content

        return updated_reasoning_content

    def reason(self, run_messages: RunMessages) -> Iterator[RunResponseEvent]:
        self.run_response = cast(RunResponse, self.run_response)
        # Yield a reasoning started event
        if self.stream_intermediate_steps:
            yield self._handle_event(
                create_reasoning_started_event(from_run_response=self.run_response), self.run_response
            )

        use_default_reasoning = False

        # Get the reasoning model
        reasoning_model: Optional[Model] = self.reasoning_model
        reasoning_model_provided = reasoning_model is not None
        if reasoning_model is None and self.model is not None:
            from copy import deepcopy

            reasoning_model = deepcopy(self.model)
        if reasoning_model is None:
            log_warning("Reasoning error. Reasoning model is None, continuing regular session...")
            return

        # If a reasoning model is provided, use it to generate reasoning
        if reasoning_model_provided:
            from agno.reasoning.azure_ai_foundry import is_ai_foundry_reasoning_model
            from agno.reasoning.deepseek import is_deepseek_reasoning_model
            from agno.reasoning.groq import is_groq_reasoning_model
            from agno.reasoning.helpers import get_reasoning_agent
            from agno.reasoning.ollama import is_ollama_reasoning_model
            from agno.reasoning.openai import is_openai_reasoning_model

            reasoning_agent = self.reasoning_agent or get_reasoning_agent(
                reasoning_model=reasoning_model,
                monitoring=self.monitoring,
                telemetry=self.telemetry,
                debug_mode=self.debug_mode,
                debug_level=self.debug_level,
                session_state=self.session_state,
                context=self.context,
                extra_data=self.extra_data,
            )
            is_deepseek = is_deepseek_reasoning_model(reasoning_model)
            is_groq = is_groq_reasoning_model(reasoning_model)
            is_openai = is_openai_reasoning_model(reasoning_model)
            is_ollama = is_ollama_reasoning_model(reasoning_model)
            is_ai_foundry = is_ai_foundry_reasoning_model(reasoning_model)

            if is_deepseek or is_groq or is_openai or is_ollama or is_ai_foundry:
                reasoning_message: Optional[Message] = None
                if is_deepseek:
                    from agno.reasoning.deepseek import get_deepseek_reasoning

                    log_debug("Starting DeepSeek Reasoning", center=True, symbol="=")
                    reasoning_message = get_deepseek_reasoning(
                        reasoning_agent=reasoning_agent, messages=run_messages.get_input_messages()
                    )
                elif is_groq:
                    from agno.reasoning.groq import get_groq_reasoning

                    log_debug("Starting Groq Reasoning", center=True, symbol="=")
                    reasoning_message = get_groq_reasoning(
                        reasoning_agent=reasoning_agent, messages=run_messages.get_input_messages()
                    )
                elif is_openai:
                    from agno.reasoning.openai import get_openai_reasoning

                    log_debug("Starting OpenAI Reasoning", center=True, symbol="=")
                    reasoning_message = get_openai_reasoning(
                        reasoning_agent=reasoning_agent, messages=run_messages.get_input_messages()
                    )
                elif is_ollama:
                    from agno.reasoning.ollama import get_ollama_reasoning

                    log_debug("Starting Ollama Reasoning", center=True, symbol="=")
                    reasoning_message = get_ollama_reasoning(
                        reasoning_agent=reasoning_agent, messages=run_messages.get_input_messages()
                    )
                elif is_ai_foundry:
                    from agno.reasoning.azure_ai_foundry import get_ai_foundry_reasoning

                    log_debug("Starting Azure AI Foundry Reasoning", center=True, symbol="=")
                    reasoning_message = get_ai_foundry_reasoning(
                        reasoning_agent=reasoning_agent, messages=run_messages.get_input_messages()
                    )

                if reasoning_message is None:
                    log_warning("Reasoning error. Reasoning response is None, continuing regular session...")
                    return
                run_messages.messages.append(reasoning_message)
                # Add reasoning step to the Agent's run_response
                self.update_run_response_with_reasoning(
                    reasoning_steps=[ReasoningStep(result=reasoning_message.content)],
                    reasoning_agent_messages=[reasoning_message],
                )
                if self.stream_intermediate_steps:
                    yield self._handle_event(
                        create_reasoning_completed_event(
                            from_run_response=self.run_response,
                            content=ReasoningSteps(reasoning_steps=[ReasoningStep(result=reasoning_message.content)]),
                            content_type=ReasoningSteps.__name__,
                        ),
                        self.run_response,
                    )
            else:
                log_warning(
                    f"Reasoning model: {reasoning_model.__class__.__name__} is not a native reasoning model, defaulting to manual Chain-of-Thought reasoning"
                )
                use_default_reasoning = True
        # If no reasoning model is provided, use default reasoning
        else:
            use_default_reasoning = True

        if use_default_reasoning:
            from agno.reasoning.default import get_default_reasoning_agent
            from agno.reasoning.helpers import get_next_action, update_messages_with_reasoning

            # Get default reasoning agent
            reasoning_agent: Optional[Agent] = self.reasoning_agent  # type: ignore
            if reasoning_agent is None:
                reasoning_agent = get_default_reasoning_agent(
                    reasoning_model=reasoning_model,
                    min_steps=self.reasoning_min_steps,
                    max_steps=self.reasoning_max_steps,
                    tools=self.tools,
                    use_json_mode=self.use_json_mode,
                    monitoring=self.monitoring,
                    telemetry=self.telemetry,
                    debug_mode=self.debug_mode,
                    debug_level=self.debug_level,
                    session_state=self.session_state,
                    context=self.context,
                    extra_data=self.extra_data,
                )

            # Validate reasoning agent
            if reasoning_agent is None:
                log_warning("Reasoning error. Reasoning agent is None, continuing regular session...")
                return
            # Ensure the reasoning agent response model is ReasoningSteps
            if (
                reasoning_agent.response_model is not None
                and not isinstance(reasoning_agent.response_model, type)
                and not issubclass(reasoning_agent.response_model, ReasoningSteps)
            ):
                log_warning("Reasoning agent response model should be `ReasoningSteps`, continuing regular session...")
                return
            # Ensure the reasoning model and agent do not show tool calls
            reasoning_agent.show_tool_calls = False

            step_count = 1
            next_action = NextAction.CONTINUE
            reasoning_messages: List[Message] = []
            all_reasoning_steps: List[ReasoningStep] = []
            log_debug("Starting Reasoning", center=True, symbol="=")
            while next_action == NextAction.CONTINUE and step_count < self.reasoning_max_steps:
                log_debug(f"Step {step_count}", center=True, symbol="=")
                try:
                    # Run the reasoning agent
                    reasoning_agent_response: RunResponse = reasoning_agent.run(
                        messages=run_messages.get_input_messages()
                    )
                    if reasoning_agent_response.content is None or reasoning_agent_response.messages is None:
                        log_warning("Reasoning error. Reasoning response is empty, continuing regular session...")
                        break

                    if (
                        reasoning_agent_response.content.reasoning_steps is None
                        or len(reasoning_agent_response.content.reasoning_steps) == 0
                    ):
                        log_warning("Reasoning error. Reasoning steps are empty, continuing regular session...")
                        break

                    reasoning_steps: List[ReasoningStep] = reasoning_agent_response.content.reasoning_steps
                    all_reasoning_steps.extend(reasoning_steps)
                    # Yield reasoning steps
                    if self.stream_intermediate_steps:
                        for reasoning_step in reasoning_steps:
                            updated_reasoning_content = self._format_reasoning_step_content(reasoning_step)

                            yield self._handle_event(
                                create_reasoning_step_event(
                                    from_run_response=self.run_response,
                                    reasoning_step=reasoning_step,
                                    reasoning_content=updated_reasoning_content,
                                ),
                                self.run_response,
                            )

                    # Find the index of the first assistant message
                    first_assistant_index = next(
                        (i for i, m in enumerate(reasoning_agent_response.messages) if m.role == "assistant"),
                        len(reasoning_agent_response.messages),
                    )
                    # Extract reasoning messages starting from the message after the first assistant message
                    reasoning_messages = reasoning_agent_response.messages[first_assistant_index:]

                    # Add reasoning step to the Agent's run_response
                    self.update_run_response_with_reasoning(
                        reasoning_steps=reasoning_steps, reasoning_agent_messages=reasoning_agent_response.messages
                    )
                    # Get the next action
                    next_action = get_next_action(reasoning_steps[-1])
                    if next_action == NextAction.FINAL_ANSWER:
                        break
                except Exception as e:
                    log_error(f"Reasoning error: {e}")
                    break

                step_count += 1

            log_debug(f"Total Reasoning steps: {len(all_reasoning_steps)}")
            log_debug("Reasoning finished", center=True, symbol="=")

            # Update the messages_for_model to include reasoning messages
            update_messages_with_reasoning(
                run_messages=run_messages,
                reasoning_messages=reasoning_messages,
            )

            # Yield the final reasoning completed event
            if self.stream_intermediate_steps:
                yield self._handle_event(
                    create_reasoning_completed_event(
                        from_run_response=self.run_response,
                        content=ReasoningSteps(reasoning_steps=all_reasoning_steps),
                        content_type=ReasoningSteps.__name__,
                    ),
                    self.run_response,
                )

    async def areason(self, run_messages: RunMessages) -> Any:
        self.run_response = cast(RunResponse, self.run_response)
        # Yield a reasoning started event
        if self.stream_intermediate_steps:
            yield self._handle_event(
                create_reasoning_started_event(from_run_response=self.run_response), self.run_response
            )

        use_default_reasoning = False

        # Get the reasoning model
        reasoning_model: Optional[Model] = self.reasoning_model
        reasoning_model_provided = reasoning_model is not None
        if reasoning_model is None and self.model is not None:
            from copy import deepcopy

            reasoning_model = deepcopy(self.model)
        if reasoning_model is None:
            log_warning("Reasoning error. Reasoning model is None, continuing regular session...")
            return

        # If a reasoning model is provided, use it to generate reasoning
        if reasoning_model_provided:
            from agno.reasoning.azure_ai_foundry import is_ai_foundry_reasoning_model
            from agno.reasoning.deepseek import is_deepseek_reasoning_model
            from agno.reasoning.groq import is_groq_reasoning_model
            from agno.reasoning.helpers import get_reasoning_agent
            from agno.reasoning.ollama import is_ollama_reasoning_model
            from agno.reasoning.openai import is_openai_reasoning_model

            reasoning_agent = self.reasoning_agent or get_reasoning_agent(
                reasoning_model=reasoning_model,
                monitoring=self.monitoring,
                telemetry=self.telemetry,
                debug_mode=self.debug_mode,
                debug_level=self.debug_level,
                session_state=self.session_state,
                context=self.context,
                extra_data=self.extra_data,
            )
            is_deepseek = is_deepseek_reasoning_model(reasoning_model)
            is_groq = is_groq_reasoning_model(reasoning_model)
            is_openai = is_openai_reasoning_model(reasoning_model)
            is_ollama = is_ollama_reasoning_model(reasoning_model)
            is_ai_foundry = is_ai_foundry_reasoning_model(reasoning_model)

            if is_deepseek or is_groq or is_openai or is_ollama or is_ai_foundry:
                reasoning_message: Optional[Message] = None
                if is_deepseek:
                    from agno.reasoning.deepseek import aget_deepseek_reasoning

                    log_debug("Starting DeepSeek Reasoning", center=True, symbol="=")
                    reasoning_message = await aget_deepseek_reasoning(
                        reasoning_agent=reasoning_agent, messages=run_messages.get_input_messages()
                    )
                elif is_groq:
                    from agno.reasoning.groq import aget_groq_reasoning

                    log_debug("Starting Groq Reasoning", center=True, symbol="=")
                    reasoning_message = await aget_groq_reasoning(
                        reasoning_agent=reasoning_agent, messages=run_messages.get_input_messages()
                    )
                elif is_openai:
                    from agno.reasoning.openai import aget_openai_reasoning

                    log_debug("Starting OpenAI Reasoning", center=True, symbol="=")
                    reasoning_message = await aget_openai_reasoning(
                        reasoning_agent=reasoning_agent, messages=run_messages.get_input_messages()
                    )
                elif is_ollama:
                    from agno.reasoning.ollama import get_ollama_reasoning

                    log_debug("Starting Ollama Reasoning", center=True, symbol="=")
                    reasoning_message = get_ollama_reasoning(
                        reasoning_agent=reasoning_agent, messages=run_messages.get_input_messages()
                    )
                elif is_ai_foundry:
                    from agno.reasoning.azure_ai_foundry import get_ai_foundry_reasoning

                    log_debug("Starting Azure AI Foundry Reasoning", center=True, symbol="=")
                    reasoning_message = get_ai_foundry_reasoning(
                        reasoning_agent=reasoning_agent, messages=run_messages.get_input_messages()
                    )

                if reasoning_message is None:
                    log_warning("Reasoning error. Reasoning response is None, continuing regular session...")
                    return
                run_messages.messages.append(reasoning_message)
                # Add reasoning step to the Agent's run_response
                self.update_run_response_with_reasoning(
                    reasoning_steps=[ReasoningStep(result=reasoning_message.content)],
                    reasoning_agent_messages=[reasoning_message],
                )
                if self.stream_intermediate_steps:
                    yield self._handle_event(
                        create_reasoning_completed_event(
                            from_run_response=self.run_response,
                            content=ReasoningSteps(reasoning_steps=[ReasoningStep(result=reasoning_message.content)]),
                            content_type=ReasoningSteps.__name__,
                        ),
                        self.run_response,
                    )
            else:
                log_warning(
                    f"Reasoning model: {reasoning_model.__class__.__name__} is not a native reasoning model, defaulting to manual Chain-of-Thought reasoning"
                )
                use_default_reasoning = True
        # If no reasoning model is provided, use default reasoning
        else:
            use_default_reasoning = True

        if use_default_reasoning:
            from agno.reasoning.default import get_default_reasoning_agent
            from agno.reasoning.helpers import get_next_action, update_messages_with_reasoning

            # Get default reasoning agent
            reasoning_agent: Optional[Agent] = self.reasoning_agent  # type: ignore
            if reasoning_agent is None:
                reasoning_agent = get_default_reasoning_agent(
                    reasoning_model=reasoning_model,
                    min_steps=self.reasoning_min_steps,
                    max_steps=self.reasoning_max_steps,
                    tools=self.tools,
                    use_json_mode=self.use_json_mode,
                    monitoring=self.monitoring,
                    telemetry=self.telemetry,
                    debug_mode=self.debug_mode,
                    debug_level=self.debug_level,
                    session_state=self.session_state,
                    context=self.context,
                    extra_data=self.extra_data,
                )

            # Validate reasoning agent
            if reasoning_agent is None:
                log_warning("Reasoning error. Reasoning agent is None, continuing regular session...")
                return
            # Ensure the reasoning agent response model is ReasoningSteps
            if (
                reasoning_agent.response_model is not None
                and not isinstance(reasoning_agent.response_model, type)
                and not issubclass(reasoning_agent.response_model, ReasoningSteps)
            ):
                log_warning("Reasoning agent response model should be `ReasoningSteps`, continuing regular session...")
                return

            # Ensure the reasoning model and agent do not show tool calls
            reasoning_agent.show_tool_calls = False

            step_count = 1
            next_action = NextAction.CONTINUE
            reasoning_messages: List[Message] = []
            all_reasoning_steps: List[ReasoningStep] = []
            log_debug("Starting Reasoning", center=True, symbol="=")
            while next_action == NextAction.CONTINUE and step_count < self.reasoning_max_steps:
                log_debug(f"Step {step_count}", center=True, symbol="=")
                step_count += 1
                try:
                    # Run the reasoning agent
                    reasoning_agent_response: RunResponse = await reasoning_agent.arun(
                        messages=run_messages.get_input_messages()
                    )
                    if reasoning_agent_response.content is None or reasoning_agent_response.messages is None:
                        log_warning("Reasoning error. Reasoning response is empty, continuing regular session...")
                        break

                    if reasoning_agent_response.content.reasoning_steps is None:
                        log_warning("Reasoning error. Reasoning steps are empty, continuing regular session...")
                        break

                    reasoning_steps: List[ReasoningStep] = reasoning_agent_response.content.reasoning_steps
                    all_reasoning_steps.extend(reasoning_steps)
                    # Yield reasoning steps
                    if self.stream_intermediate_steps:
                        for reasoning_step in reasoning_steps:
                            updated_reasoning_content = self._format_reasoning_step_content(reasoning_step)

                            # Yield the response with the updated reasoning_content
                            yield self._handle_event(
                                create_reasoning_step_event(
                                    from_run_response=self.run_response,
                                    reasoning_step=reasoning_step,
                                    reasoning_content=updated_reasoning_content,
                                ),
                                self.run_response,
                            )

                    # Find the index of the first assistant message
                    first_assistant_index = next(
                        (i for i, m in enumerate(reasoning_agent_response.messages) if m.role == "assistant"),
                        len(reasoning_agent_response.messages),
                    )
                    # Extract reasoning messages starting from the message after the first assistant message
                    reasoning_messages = reasoning_agent_response.messages[first_assistant_index:]

                    # Add reasoning step to the Agent's run_response
                    self.update_run_response_with_reasoning(
                        reasoning_steps=reasoning_steps, reasoning_agent_messages=reasoning_agent_response.messages
                    )

                    # Get the next action
                    next_action = get_next_action(reasoning_steps[-1])
                    if next_action == NextAction.FINAL_ANSWER:
                        break
                except Exception as e:
                    log_error(f"Reasoning error: {e}")
                    break

            log_debug(f"Total Reasoning steps: {len(all_reasoning_steps)}")
            log_debug("Reasoning finished", center=True, symbol="=")

            # Update the messages_for_model to include reasoning messages
            update_messages_with_reasoning(
                run_messages=run_messages,
                reasoning_messages=reasoning_messages,
            )

            # Yield the final reasoning completed event
            if self.stream_intermediate_steps:
                yield self._handle_event(
                    create_reasoning_completed_event(
                        from_run_response=self.run_response,
                        content=ReasoningSteps(reasoning_steps=all_reasoning_steps),
                        content_type=ReasoningSteps.__name__,
                    ),
                    self.run_response,
                )

    def _process_parser_response(
        self,
        model_response: ModelResponse,
        run_messages: RunMessages,
        parser_model_response: ModelResponse,
        messages_for_parser_model: list,
    ) -> None:
        """Common logic for processing parser model response."""
        parser_model_response_message: Optional[Message] = None
        for message in reversed(messages_for_parser_model):
            if message.role == "assistant":
                parser_model_response_message = message
                break

        if parser_model_response_message is not None:
            run_messages.messages.append(parser_model_response_message)
            model_response.parsed = parser_model_response.parsed
            model_response.content = parser_model_response.content
        else:
            log_warning("Unable to parse response with parser model")

    def _parse_response_with_parser_model(self, model_response: ModelResponse, run_messages: RunMessages) -> None:
        """Parse the model response using the parser model."""
        if self.parser_model is None:
            return

        if self.response_model is not None:
            parser_response_format = self._get_response_format(self.parser_model)
            messages_for_parser_model = self.get_messages_for_parser_model(model_response, parser_response_format)
            parser_model_response: ModelResponse = self.parser_model.response(
                messages=messages_for_parser_model,
                response_format=parser_response_format,
            )
            self._process_parser_response(
                model_response, run_messages, parser_model_response, messages_for_parser_model
            )
        else:
            log_warning("A response model is required to parse the response with a parser model")

    async def _aparse_response_with_parser_model(
        self, model_response: ModelResponse, run_messages: RunMessages
    ) -> None:
        """Parse the model response using the parser model."""
        if self.parser_model is None:
            return

        if self.response_model is not None:
            parser_response_format = self._get_response_format(self.parser_model)
            messages_for_parser_model = self.get_messages_for_parser_model(model_response, parser_response_format)
            parser_model_response: ModelResponse = await self.parser_model.aresponse(
                messages=messages_for_parser_model,
                response_format=parser_response_format,
            )
            self._process_parser_response(
                model_response, run_messages, parser_model_response, messages_for_parser_model
            )
        else:
            log_warning("A response model is required to parse the response with a parser model")

    def _parse_response_with_parser_model_stream(
        self, run_response: RunResponse, stream_intermediate_steps: bool = True
    ):
        """Parse the model response using the parser model"""
        if self.parser_model is not None:
            if self.response_model is not None:
                if stream_intermediate_steps:
                    yield self._handle_event(create_parser_model_response_started_event(run_response), run_response)

                parser_model_response = ModelResponse(content="")
                parser_response_format = self._get_response_format(self.parser_model)
                messages_for_parser_model = self.get_messages_for_parser_model_stream(
                    run_response, parser_response_format
                )
                for model_response_event in self.parser_model.response_stream(
                    messages=messages_for_parser_model,
                    response_format=parser_response_format,
                    stream_model_response=False,
                ):
                    yield from self._handle_model_response_chunk(
                        run_response=run_response,
                        model_response=parser_model_response,
                        model_response_event=model_response_event,
                        parse_structured_output=True,
                        stream_intermediate_steps=stream_intermediate_steps,
                    )

                parser_model_response_message: Optional[Message] = None
                for message in reversed(messages_for_parser_model):
                    if message.role == "assistant":
                        parser_model_response_message = message
                        break
                if parser_model_response_message is not None:
                    if run_response.messages is not None:
                        run_response.messages.append(parser_model_response_message)
                else:
                    log_warning("Unable to parse response with parser model")

                if stream_intermediate_steps:
                    yield self._handle_event(create_parser_model_response_completed_event(run_response), run_response)

            else:
                log_warning("A response model is required to parse the response with a parser model")

    async def _aparse_response_with_parser_model_stream(
        self, run_response: RunResponse, stream_intermediate_steps: bool = True
    ):
        """Parse the model response using the parser model stream."""
        if self.parser_model is not None:
            if self.response_model is not None:
                if stream_intermediate_steps:
                    yield self._handle_event(create_parser_model_response_started_event(run_response), run_response)

                parser_model_response = ModelResponse(content="")
                parser_response_format = self._get_response_format(self.parser_model)
                messages_for_parser_model = self.get_messages_for_parser_model_stream(
                    run_response, parser_response_format
                )
                model_response_stream = self.parser_model.aresponse_stream(
                    messages=messages_for_parser_model,
                    response_format=parser_response_format,
                    stream_model_response=False,
                )
                async for model_response_event in model_response_stream:  # type: ignore
                    for event in self._handle_model_response_chunk(
                        run_response=run_response,
                        model_response=parser_model_response,
                        model_response_event=model_response_event,
                        parse_structured_output=True,
                        stream_intermediate_steps=stream_intermediate_steps,
                    ):
                        yield event

                parser_model_response_message: Optional[Message] = None
                for message in reversed(messages_for_parser_model):
                    if message.role == "assistant":
                        parser_model_response_message = message
                        break
                if parser_model_response_message is not None:
                    if run_response.messages is not None:
                        run_response.messages.append(parser_model_response_message)
                else:
                    log_warning("Unable to parse response with parser model")

                if stream_intermediate_steps:
                    yield self._handle_event(create_parser_model_response_completed_event(run_response), run_response)
            else:
                log_warning("A response model is required to parse the response with a parser model")

    def _generate_response_with_output_model(self, model_response: ModelResponse, run_messages: RunMessages) -> None:
        """Parse the model response using the output model."""
        if self.output_model is None:
            return

        messages_for_output_model = self.get_messages_for_output_model(run_messages.messages)
        output_model_response: ModelResponse = self.output_model.response(messages=messages_for_output_model)
        model_response.content = output_model_response.content

    def _generate_response_with_output_model_stream(
        self, run_response: RunResponse, run_messages: RunMessages, stream_intermediate_steps: bool = False
    ):
        """Parse the model response using the output model."""
        from agno.utils.events import (
            create_output_model_response_completed_event,
            create_output_model_response_started_event,
        )

        if self.output_model is None:
            return

        if stream_intermediate_steps:
            yield self._handle_event(create_output_model_response_started_event(run_response), run_response)

        messages_for_output_model = self.get_messages_for_output_model(run_messages.messages)

        model_response = ModelResponse(content="")

        for model_response_event in self.output_model.response_stream(messages=messages_for_output_model):
            yield from self._handle_model_response_chunk(
                run_response=run_response,
                model_response=model_response,
                model_response_event=model_response_event,
            )

        if stream_intermediate_steps:
            yield self._handle_event(create_output_model_response_completed_event(run_response), run_response)

        # Build a list of messages that should be added to the RunResponse
        messages_for_run_response = [m for m in run_messages.messages if m.add_to_agent_memory]
        # Update the RunResponse messages
        run_response.messages = messages_for_run_response
        # Update the RunResponse metrics
        run_response.metrics = self.aggregate_metrics_from_messages(messages_for_run_response)

    async def _agenerate_response_with_output_model(self, model_response: ModelResponse, run_messages: RunMessages):
        """Parse the model response using the output model."""
        if self.output_model is None:
            return

        messages_for_output_model = self.get_messages_for_output_model(run_messages.messages)
        output_model_response: ModelResponse = await self.output_model.aresponse(messages=messages_for_output_model)
        model_response.content = output_model_response.content

    async def _agenerate_response_with_output_model_stream(
        self, run_response: RunResponse, run_messages: RunMessages, stream_intermediate_steps: bool = False
    ):
        """Parse the model response using the output model."""
        from agno.utils.events import (
            create_output_model_response_completed_event,
            create_output_model_response_started_event,
        )

        if self.output_model is None:
            return

        if stream_intermediate_steps:
            yield self._handle_event(create_output_model_response_started_event(run_response), run_response)

        messages_for_output_model = self.get_messages_for_output_model(run_messages.messages)

        model_response = ModelResponse(content="")

        model_response_stream = self.output_model.aresponse_stream(messages=messages_for_output_model)

        async for model_response_event in model_response_stream:
            for event in self._handle_model_response_chunk(
                run_response=run_response,
                model_response=model_response,
                model_response_event=model_response_event,
            ):
                yield event

        if stream_intermediate_steps:
            yield self._handle_event(create_output_model_response_completed_event(run_response), run_response)

        # Build a list of messages that should be added to the RunResponse
        messages_for_run_response = [m for m in run_messages.messages if m.add_to_agent_memory]
        # Update the RunResponse messages
        run_response.messages = messages_for_run_response
        # Update the RunResponse metrics
        run_response.metrics = self.aggregate_metrics_from_messages(messages_for_run_response)

    def _handle_event(self, event: RunResponseEvent, run_response: RunResponse):
        # We only store events that are not run_response_content events
        events_to_skip = [event.value for event in self.events_to_skip] if self.events_to_skip else []
        if self.store_events and event.event not in events_to_skip:
            if run_response.events is None:
                run_response.events = []
            run_response.events.append(event)
        return event

    ###########################################################################
    # Default Tools
    ###########################################################################

    def get_update_user_memory_function(self, user_id: Optional[str] = None, async_mode: bool = False) -> Function:
        def update_user_memory(task: str) -> str:
            """Use this function to submit a task to modify the Agent's memory.
            Describe the task in detail and be specific.
            The task can include adding a memory, updating a memory, deleting a memory, or clearing all memories.

            Args:
                task: The task to update the memory. Be specific and describe the task in detail.

            Returns:
                str: A string indicating the status of the task.
            """
            self.memory = cast(Memory, self.memory)
            response = self.memory.update_memory_task(task=task, user_id=user_id)

            return response

        async def aupdate_user_memory(task: str) -> str:
            """Use this function to update the Agent's memory of a user.
            Describe the task in detail and be specific.
            The task can include adding a memory, updating a memory, deleting a memory, or clearing all memories.

            Args:
                task: The task to update the memory. Be specific and describe the task in detail.

            Returns:
                str: A string indicating the status of the task.
            """
            self.memory = cast(Memory, self.memory)
            response = await self.memory.aupdate_memory_task(task=task, user_id=user_id)
            return response

        if async_mode:
            update_user_memory_function = aupdate_user_memory
        else:
            update_user_memory_function = update_user_memory  # type: ignore

        return Function.from_callable(update_user_memory_function, name="update_user_memory")

    def get_chat_history_function(self, session_id: str) -> Callable:
        def get_chat_history(num_chats: Optional[int] = None) -> str:
            """Use this function to get the chat history between the user and agent.

            Args:
                num_chats: The number of chats to return.
                    Each chat contains 2 messages. One from the user and one from the agent.
                    Default: None

            Returns:
                str: A JSON of a list of dictionaries representing the chat history.

            Example:
                - To get the last chat, use num_chats=1.
                - To get the last 5 chats, use num_chats=5.
                - To get all chats, use num_chats=None.
                - To get the first chat, use num_chats=None and pick the first message.
            """
            import json

            history: List[Dict[str, Any]] = []
            if isinstance(self.memory, AgentMemory):
                agent_chats = self.memory.get_message_pairs()

                if len(agent_chats) == 0:
                    return ""

                chats_added = 0
                for chat in agent_chats[::-1]:
                    history.insert(0, chat[1].to_dict())
                    history.insert(0, chat[0].to_dict())
                    chats_added += 1
                    if num_chats is not None and chats_added >= num_chats:
                        break

            elif isinstance(self.memory, Memory):
                all_chats = self.memory.get_messages_for_session(session_id=session_id)

                if len(all_chats) == 0:
                    return ""

                for chat in all_chats[::-1]:  # type: ignore
                    history.insert(0, chat.to_dict())  # type: ignore

                if num_chats is not None:
                    history = history[:num_chats]

            else:
                return ""

            return json.dumps(history)

        return get_chat_history

    def get_tool_call_history_function(self, session_id: str) -> Callable:
        def get_tool_call_history(num_calls: int = 3) -> str:
            """Use this function to get the tools called by the agent in reverse chronological order.

            Args:
                num_calls: The number of tool calls to return.
                    Default: 3

            Returns:
                str: A JSON of a list of dictionaries representing the tool call history.

            Example:
                - To get the last tool call, use num_calls=1.
                - To get all tool calls, use num_calls=None.
            """
            import json

            if isinstance(self.memory, AgentMemory):
                tool_calls = self.memory.get_tool_calls(num_calls=num_calls)
            elif isinstance(self.memory, Memory):
                tool_calls = self.memory.get_tool_calls(session_id=session_id, num_calls=num_calls)
            else:
                return ""
            if len(tool_calls) == 0:
                return ""
            log_debug(f"tool_calls: {tool_calls}")
            return json.dumps(tool_calls)

        return get_tool_call_history

    def search_knowledge_base_function(
        self, knowledge_filters: Optional[Dict[str, Any]] = None, async_mode: bool = False
    ) -> Function:
        """Factory function to create a search_knowledge_base function with filters."""

        def search_knowledge_base(query: str) -> str:
            """Use this function to search the knowledge base for information about a query.

            Args:
                query: The query to search for.

            Returns:
                str: A string containing the response from the knowledge base.
            """

            # Get the relevant documents from the knowledge base, passing filters
            self.run_response = cast(RunResponse, self.run_response)
            retrieval_timer = Timer()
            retrieval_timer.start()
            docs_from_knowledge = self.get_relevant_docs_from_knowledge(query=query, filters=knowledge_filters)
            if docs_from_knowledge is not None:
                references = MessageReferences(
                    query=query, references=docs_from_knowledge, time=round(retrieval_timer.elapsed, 4)
                )
                # Add the references to the run_response
                if self.run_response.extra_data is None:
                    self.run_response.extra_data = RunResponseExtraData()
                if self.run_response.extra_data.references is None:
                    self.run_response.extra_data.references = []
                self.run_response.extra_data.references.append(references)
            retrieval_timer.stop()
            from agno.utils.log import log_debug

            log_debug(f"Time to get references: {retrieval_timer.elapsed:.4f}s")

            if docs_from_knowledge is None:
                return "No documents found"
            return self.convert_documents_to_string(docs_from_knowledge)

        async def asearch_knowledge_base(query: str) -> str:
            """Use this function to search the knowledge base for information about a query asynchronously.

            Args:
                query: The query to search for.

            Returns:
                str: A string containing the response from the knowledge base.
            """
            self.run_response = cast(RunResponse, self.run_response)
            retrieval_timer = Timer()
            retrieval_timer.start()
            docs_from_knowledge = await self.aget_relevant_docs_from_knowledge(query=query, filters=knowledge_filters)
            if docs_from_knowledge is not None:
                references = MessageReferences(
                    query=query, references=docs_from_knowledge, time=round(retrieval_timer.elapsed, 4)
                )
                if self.run_response.extra_data is None:
                    self.run_response.extra_data = RunResponseExtraData()
                if self.run_response.extra_data.references is None:
                    self.run_response.extra_data.references = []
                self.run_response.extra_data.references.append(references)
            retrieval_timer.stop()
            log_debug(f"Time to get references: {retrieval_timer.elapsed:.4f}s")

            if docs_from_knowledge is None:
                return "No documents found"
            return self.convert_documents_to_string(docs_from_knowledge)

        if async_mode:
            search_knowledge_base_function = asearch_knowledge_base
        else:
            search_knowledge_base_function = search_knowledge_base  # type: ignore

        return Function.from_callable(search_knowledge_base_function, name="search_knowledge_base")

    def search_knowledge_base_with_agentic_filters_function(
        self, knowledge_filters: Optional[Dict[str, Any]] = None, async_mode: bool = False
    ) -> Function:
        """Factory function to create a search_knowledge_base function with filters."""

        def search_knowledge_base(query: str, filters: Optional[Dict[str, Any]] = None) -> str:
            """Use this function to search the knowledge base for information about a query.

            Args:
                query: The query to search for.
                filters: The filters to apply to the search. This is a dictionary of key-value pairs.

            Returns:
                str: A string containing the response from the knowledge base.
            """
            search_filters = self._get_agentic_or_user_search_filters(filters, knowledge_filters)

            # Get the relevant documents from the knowledge base, passing filters
            self.run_response = cast(RunResponse, self.run_response)
            retrieval_timer = Timer()
            retrieval_timer.start()
            docs_from_knowledge = self.get_relevant_docs_from_knowledge(query=query, filters=search_filters)
            if docs_from_knowledge is not None:
                references = MessageReferences(
                    query=query, references=docs_from_knowledge, time=round(retrieval_timer.elapsed, 4)
                )
                # Add the references to the run_response
                if self.run_response.extra_data is None:
                    self.run_response.extra_data = RunResponseExtraData()
                if self.run_response.extra_data.references is None:
                    self.run_response.extra_data.references = []
                self.run_response.extra_data.references.append(references)
            retrieval_timer.stop()
            from agno.utils.log import log_debug

            log_debug(f"Time to get references: {retrieval_timer.elapsed:.4f}s")

            if docs_from_knowledge is None:
                return "No documents found"
            return self.convert_documents_to_string(docs_from_knowledge)

        async def asearch_knowledge_base(query: str, filters: Optional[Dict[str, Any]] = None) -> str:
            """Use this function to search the knowledge base for information about a query asynchronously.

            Args:
                query: The query to search for.
                filters: The filters to apply to the search. This is a dictionary of key-value pairs.

            Returns:
                str: A string containing the response from the knowledge base.
            """
            search_filters = self._get_agentic_or_user_search_filters(filters, knowledge_filters)

            self.run_response = cast(RunResponse, self.run_response)
            retrieval_timer = Timer()
            retrieval_timer.start()
            docs_from_knowledge = await self.aget_relevant_docs_from_knowledge(query=query, filters=search_filters)
            if docs_from_knowledge is not None:
                references = MessageReferences(
                    query=query, references=docs_from_knowledge, time=round(retrieval_timer.elapsed, 4)
                )
                if self.run_response.extra_data is None:
                    self.run_response.extra_data = RunResponseExtraData()
                if self.run_response.extra_data.references is None:
                    self.run_response.extra_data.references = []
                self.run_response.extra_data.references.append(references)
            retrieval_timer.stop()
            log_debug(f"Time to get references: {retrieval_timer.elapsed:.4f}s")

            if docs_from_knowledge is None:
                return "No documents found"
            return self.convert_documents_to_string(docs_from_knowledge)

        if async_mode:
            search_knowledge_base_function = asearch_knowledge_base
        else:
            search_knowledge_base_function = search_knowledge_base  # type: ignore

        return Function.from_callable(search_knowledge_base_function, name="search_knowledge_base_with_agentic_filters")

    def _get_agentic_or_user_search_filters(
        self, filters: Optional[Dict[str, Any]], effective_filters: Optional[Dict[str, Any]]
    ) -> Dict[str, Any]:
        """Helper function to determine the final filters to use for the search.

        Args:
            filters: Filters passed by the agent.
            effective_filters: Filters passed by user.

        Returns:
            Dict[str, Any]: The final filters to use for the search.
        """
        search_filters = {}

        # If agentic filters exist and manual filters (passed by user) do not, use agentic filters
        if filters and not effective_filters:
            search_filters = filters

        # If both agentic filters exist and manual filters (passed by user) exist, use manual filters (give priority to user and override)
        if filters and effective_filters:
            search_filters = effective_filters

        log_info(f"Filters used by Agent: {search_filters}")
        return search_filters

    def add_to_knowledge(self, query: str, result: str) -> str:
        """Use this function to add information to the knowledge base for future use.

        Args:
            query: The query to add.
            result: The result of the query.

        Returns:
            str: A string indicating the status of the addition.
        """
        import json

        from agno.document import Document

        if self.knowledge is None:
            return "Knowledge base not available"
        document_name = self.name
        if document_name is None:
            document_name = query.replace(" ", "_").replace("?", "").replace("!", "").replace(".", "")
        document_content = json.dumps({"query": query, "result": result})
        log_info(f"Adding document to knowledge base: {document_name}: {document_content}")
        self.knowledge.load_document(
            document=Document(
                name=document_name,
                content=document_content,
            )
        )
        return "Successfully added to knowledge base"

    def update_memory(self, task: str) -> str:
        """Use this function to update the Agent's memory. Describe the task in detail.

        Args:
            task: The task to update the memory with.

        Returns:
            str: A string indicating the status of the task.
        """
        self.memory = cast(AgentMemory, self.memory)
        try:
            return self.memory.update_memory(input=task, force=True) or "Memory updated successfully"
        except Exception as e:
            return f"Failed to update memory: {e}"

    ###########################################################################
    # Api functions
    ###########################################################################

    def _log_agent_session(self, session_id: str, user_id: Optional[str] = None):
        if not (self.telemetry or self.monitoring):
            return

        from agno.api.agent import AgentSessionCreate, create_agent_session

        try:
            agent_session: AgentSession = self.agent_session or self.get_agent_session(
                session_id=session_id, user_id=user_id
            )
            create_agent_session(
                session=AgentSessionCreate(
                    session_id=agent_session.session_id,
                    agent_data=agent_session.to_dict() if self.monitoring else agent_session.telemetry_data(),
                ),
                monitor=self.monitoring,
            )
        except Exception as e:
            log_debug(f"Could not create agent monitor: {e}")

    def _create_run_data(self) -> Dict[str, Any]:
        """Create and return the run data dictionary."""
        run_response_format = "text"
        self.run_response = cast(RunResponse, self.run_response)
        if self.response_model is not None:
            run_response_format = "json"
        elif self.markdown:
            run_response_format = "markdown"

        functions = {}
        if self._functions_for_model is not None:
            functions = {
                f_name: func.to_dict()
                for f_name, func in self._functions_for_model.items()
                if isinstance(func, Function)
            }

        run_data: Dict[str, Any] = {
            "functions": functions,
            "metrics": self.run_response.metrics,
        }

        if self.monitoring:
            run_data.update(
                {
                    "run_input": self.run_input,
                    "run_response": self.run_response.to_dict(),
                    "run_response_format": run_response_format,
                }
            )

        return run_data

    def _log_agent_run(self, session_id: str, user_id: Optional[str] = None) -> None:
        self.set_monitoring()

        if not self.telemetry and not self.monitoring:
            return

        from agno.api.agent import AgentRunCreate, create_agent_run

        try:
            run_data = self._create_run_data()
            agent_session: AgentSession = self.agent_session or self.get_agent_session(
                session_id=session_id, user_id=user_id
            )

            create_agent_run(
                run=AgentRunCreate(
                    run_id=self.run_id,
                    run_data=run_data,
                    session_id=agent_session.session_id,
                    agent_data=agent_session.to_dict() if self.monitoring else agent_session.telemetry_data(),
                    team_session_id=agent_session.team_session_id,
                    workflow_session_id=agent_session.workflow_session_id,
                ),
                monitor=self.monitoring,
            )
        except Exception as e:
            log_debug(f"Could not create agent event: {e}")

    async def _alog_agent_run(self, session_id: str, user_id: Optional[str] = None) -> None:
        self.set_monitoring()

        if not self.telemetry and not self.monitoring:
            return

        from agno.api.agent import AgentRunCreate, acreate_agent_run

        try:
            run_data = self._create_run_data()
            agent_session: AgentSession = self.agent_session or self.get_agent_session(
                session_id=session_id, user_id=user_id
            )

            await acreate_agent_run(
                run=AgentRunCreate(
                    run_id=self.run_id,
                    run_data=run_data,
                    session_id=agent_session.session_id,
                    agent_data=agent_session.to_dict() if self.monitoring else agent_session.telemetry_data(),
                    team_session_id=agent_session.team_session_id,
                    workflow_session_id=agent_session.workflow_session_id,
                ),
                monitor=self.monitoring,
            )
        except Exception as e:
            log_debug(f"Could not create agent event: {e}")

    ###########################################################################
    # Print Response
    ###########################################################################

    def print_response(
        self,
        message: Optional[Union[List, Dict, str, Message, BaseModel]] = None,
        *,
        session_id: Optional[str] = None,
        session_state: Optional[Dict[str, Any]] = None,
        user_id: Optional[str] = None,
        messages: Optional[List[Union[Dict, Message]]] = None,
        audio: Optional[Sequence[Audio]] = None,
        images: Optional[Sequence[Image]] = None,
        videos: Optional[Sequence[Video]] = None,
        files: Optional[Sequence[File]] = None,
        stream: Optional[bool] = None,
        stream_intermediate_steps: bool = False,
        markdown: bool = False,
        show_message: bool = True,
        show_reasoning: bool = True,
        show_full_reasoning: bool = False,
        console: Optional[Any] = None,
        # Add tags to include in markdown content
        tags_to_include_in_markdown: Set[str] = {"think", "thinking"},
        knowledge_filters: Optional[Dict[str, Any]] = None,
        **kwargs: Any,
    ) -> None:
        import json

        from rich.console import Group
        from rich.json import JSON
        from rich.live import Live
        from rich.markdown import Markdown
        from rich.status import Status
        from rich.text import Text

        if markdown:
            self.markdown = True

        if self.response_model is not None:
            self.markdown = False

        stream_intermediate_steps = stream_intermediate_steps or self.stream_intermediate_steps
        stream = stream or self.stream or False
        if stream:
            _response_content: str = ""
            _response_thinking: str = ""
            _response_reasoning_content: str = ""
            response_content_batch: Union[str, JSON, Markdown] = ""
            reasoning_steps: List[ReasoningStep] = []

            with Live(console=console) as live_log:
                status = Status("Thinking...", spinner="aesthetic", speed=0.4, refresh_per_second=10)
                live_log.update(status)
                response_timer = Timer()
                response_timer.start()
                # Flag which indicates if the panels should be rendered
                render = False
                # Panels to be rendered
                panels = [status]
                # First render the message panel if the message is not None
                if message and show_message:
                    render = True
                    # Convert message to a panel
                    message_content = get_text_from_message(message)
                    message_panel = create_panel(
                        content=Text(message_content, style="green"),
                        title="Message",
                        border_style="cyan",
                    )
                    panels.append(message_panel)
                if render:
                    live_log.update(Group(*panels))

                for resp in self.run(
                    message=message,
                    messages=messages,
                    session_id=session_id,
                    session_state=session_state,
                    user_id=user_id,
                    audio=audio,
                    images=images,
                    videos=videos,
                    files=files,
                    stream=True,
                    stream_intermediate_steps=stream_intermediate_steps,
                    knowledge_filters=knowledge_filters,
                    **kwargs,
                ):
                    if isinstance(resp, tuple(get_args(RunResponseEvent))):
                        if resp.is_paused:
                            resp = cast(RunResponsePausedEvent, resp)
                            response_panel = create_paused_run_response_panel(resp)
                            panels.append(response_panel)
                            live_log.update(Group(*panels))
                            break
                        if resp.event == RunEvent.run_response_content:
                            if hasattr(resp, "content"):
                                if isinstance(resp.content, str):
                                    _response_content += resp.content
                                elif self.response_model is not None and isinstance(resp.content, BaseModel):
                                    try:
                                        response_content_batch = JSON(  # type: ignore
                                            resp.content.model_dump_json(exclude_none=True), indent=2
                                        )
                                    except Exception as e:
                                        log_warning(f"Failed to convert response to JSON: {e}")
                                else:
                                    try:
                                        response_content_batch = JSON(json.dumps(resp.content), indent=4)
                                    except Exception as e:
                                        log_warning(f"Failed to convert response to JSON: {e}")
                            if hasattr(resp, "thinking") and resp.thinking is not None:
                                _response_thinking += resp.thinking
                            if hasattr(resp, "reasoning_content") and resp.reasoning_content is not None:
                                _response_reasoning_content += resp.reasoning_content
                        if (
                            hasattr(resp, "extra_data")
                            and resp.extra_data is not None
                            and resp.extra_data.reasoning_steps is not None
                        ):
                            reasoning_steps = resp.extra_data.reasoning_steps

                    response_content_stream: str = _response_content
                    # Escape special tags before markdown conversion
                    if self.markdown:
                        escaped_content = escape_markdown_tags(_response_content, tags_to_include_in_markdown)
                        response_content_batch = Markdown(escaped_content)
                    panels = [status]

                    if message and show_message:
                        render = True
                        # Convert message to a panel
                        message_content = get_text_from_message(message)
                        message_panel = create_panel(
                            content=Text(message_content, style="green"),
                            title="Message",
                            border_style="cyan",
                        )
                        panels.append(message_panel)
                    if render:
                        live_log.update(Group(*panels))

                    if len(reasoning_steps) > 0 and show_reasoning:
                        render = True
                        # Create panels for reasoning steps
                        for i, step in enumerate(reasoning_steps, 1):
                            # Build step content
                            step_content = Text.assemble()
                            if step.title is not None:
                                step_content.append(f"{step.title}\n", "bold")
                            if step.action is not None:
                                step_content.append(
                                    Text.from_markup(f"[bold]Action:[/bold] {step.action}\n", style="dim")
                                )
                            if step.result is not None:
                                step_content.append(Text.from_markup(step.result, style="dim"))

                            if show_full_reasoning:
                                # Add detailed reasoning information if available
                                if step.reasoning is not None:
                                    step_content.append(
                                        Text.from_markup(f"\n[bold]Reasoning:[/bold] {step.reasoning}", style="dim")
                                    )
                                if step.confidence is not None:
                                    step_content.append(
                                        Text.from_markup(f"\n[bold]Confidence:[/bold] {step.confidence}", style="dim")
                                    )
                            reasoning_panel = create_panel(
                                content=step_content, title=f"Reasoning step {i}", border_style="green"
                            )
                            panels.append(reasoning_panel)
                    if render:
                        live_log.update(Group(*panels))

                    if len(_response_thinking) > 0:
                        render = True
                        # Create panel for thinking
                        thinking_panel = create_panel(
                            content=Text(_response_thinking),
                            title=f"Thinking ({response_timer.elapsed:.1f}s)",
                            border_style="green",
                        )
                        panels.append(thinking_panel)
                    if render:
                        live_log.update(Group(*panels))

                    if len(_response_reasoning_content) > 0:
                        render = True
                        # Create panel for reasoning content
                        reasoning_panel = create_panel(
                            content=Text(_response_reasoning_content),
                            title=f"Reasoning ({response_timer.elapsed:.1f}s)",
                            border_style="green",
                        )
                        panels.append(reasoning_panel)
                    if render:
                        live_log.update(Group(*panels))

                    # Add tool calls panel if available
                    if (
                        self.show_tool_calls
                        and self.run_response is not None
                        and self.run_response.formatted_tool_calls
                    ):
                        render = True
                        # Create bullet points for each tool call
                        tool_calls_content = Text()
                        for formatted_tool_call in self.run_response.formatted_tool_calls:
                            tool_calls_content.append(f"• {formatted_tool_call}\n")

                        tool_calls_panel = create_panel(
                            content=tool_calls_content.plain.rstrip(),
                            title="Tool Calls",
                            border_style="yellow",
                        )
                        panels.append(tool_calls_panel)
                        live_log.update(Group(*panels))

                    response_panel = None
                    # Check if we have any response content to display
                    if response_content_stream and not self.markdown:
                        response_content = response_content_stream
                    else:
                        response_content = response_content_batch  # type: ignore

                    # Sanitize empty Markdown content
                    if isinstance(response_content, Markdown):
                        if not (response_content.markup and response_content.markup.strip()):
                            response_content = None  # type: ignore

                    if response_content:
                        render = True
                        response_panel = create_panel(
                            content=response_content,
                            title=f"Response ({response_timer.elapsed:.1f}s)",
                            border_style="blue",
                        )
                        panels.append(response_panel)
                        if render:
                            live_log.update(Group(*panels))

                    if (
                        isinstance(resp, tuple(get_args(RunResponseEvent)))
                        and hasattr(resp, "citations")
                        and resp.citations is not None
                        and resp.citations.urls is not None
                    ):
                        md_content = "\n".join(
                            f"{i + 1}. [{citation.title or citation.url}]({citation.url})"
                            for i, citation in enumerate(resp.citations.urls)
                            if citation.url  # Only include citations with valid URLs
                        )
                        if md_content:  # Only create panel if there are citations
                            citations_panel = create_panel(
                                content=Markdown(md_content),
                                title="Citations",
                                border_style="green",
                            )
                            panels.append(citations_panel)
                            live_log.update(Group(*panels))

                if self.memory is not None and isinstance(self.memory, Memory):
                    if self.memory.memory_manager is not None and self.memory.memory_manager.memories_updated:
                        memory_panel = create_panel(
                            content=Text("Memories updated"),
                            title="Memories",
                            border_style="green",
                        )
                        panels.append(memory_panel)
                        live_log.update(Group(*panels))
                        self.memory.memory_manager.memories_updated = False

                    if self.memory.summary_manager is not None and self.memory.summary_manager.summary_updated:
                        summary_panel = create_panel(
                            content=Text("Session summary updated"),
                            title="Session Summary",
                            border_style="green",
                        )
                        panels.append(summary_panel)
                        live_log.update(Group(*panels))
                        self.memory.summary_manager.summary_updated = False

                response_timer.stop()

                # Final update to remove the "Thinking..." status
                panels = [p for p in panels if not isinstance(p, Status)]
                live_log.update(Group(*panels))
        else:
            with Live(console=console) as live_log:
                status = Status("Thinking...", spinner="aesthetic", speed=0.4, refresh_per_second=10)
                live_log.update(status)
                response_timer = Timer()
                response_timer.start()
                # Panels to be rendered
                panels = [status]
                # First render the message panel if the message is not None
                if message and show_message:
                    # Convert message to a panel
                    message_content = get_text_from_message(message)
                    message_panel = create_panel(
                        content=Text(message_content, style="green"),
                        title="Message",
                        border_style="cyan",
                    )
                    panels.append(message_panel)
                    live_log.update(Group(*panels))

                # Run the agent
                run_response = self.run(
                    message=message,
                    messages=messages,
                    session_id=session_id,
                    session_state=session_state,
                    user_id=user_id,
                    audio=audio,
                    images=images,
                    videos=videos,
                    files=files,
                    stream=False,
                    stream_intermediate_steps=stream_intermediate_steps,
                    knowledge_filters=knowledge_filters,
                    **kwargs,
                )
                response_timer.stop()

                reasoning_steps = []

                if isinstance(run_response, RunResponse) and run_response.is_paused:
                    response_panel = create_paused_run_response_panel(run_response)
                    panels.append(response_panel)
                    live_log.update(Group(*panels))
                    return

                if (
                    isinstance(run_response, RunResponse)
                    and run_response.extra_data is not None
                    and run_response.extra_data.reasoning_steps is not None
                ):
                    reasoning_steps = run_response.extra_data.reasoning_steps

                if len(reasoning_steps) > 0 and show_reasoning:
                    # Create panels for reasoning steps
                    for i, step in enumerate(reasoning_steps, 1):
                        # Build step content
                        step_content = Text.assemble()
                        if step.title is not None:
                            step_content.append(f"{step.title}\n", "bold")
                        if step.action is not None:
                            step_content.append(Text.from_markup(f"[bold]Action:[/bold] {step.action}\n", style="dim"))
                        if step.result is not None:
                            step_content.append(Text.from_markup(step.result, style="dim"))

                        if show_full_reasoning:
                            # Add detailed reasoning information if available
                            if step.reasoning is not None:
                                step_content.append(
                                    Text.from_markup(f"\n[bold]Reasoning:[/bold] {step.reasoning}", style="dim")
                                )
                            if step.confidence is not None:
                                step_content.append(
                                    Text.from_markup(f"\n[bold]Confidence:[/bold] {step.confidence}", style="dim")
                                )
                        reasoning_panel = create_panel(
                            content=step_content, title=f"Reasoning step {i}", border_style="green"
                        )
                        panels.append(reasoning_panel)
                    live_log.update(Group(*panels))

                if isinstance(run_response, RunResponse) and run_response.thinking is not None:
                    # Create panel for thinking
                    thinking_panel = create_panel(
                        content=Text(run_response.thinking),
                        title=f"Thinking ({response_timer.elapsed:.1f}s)",
                        border_style="green",
                    )
                    panels.append(thinking_panel)
                    live_log.update(Group(*panels))

                # Add tool calls panel if available
                if self.show_tool_calls and isinstance(run_response, RunResponse) and run_response.formatted_tool_calls:
                    # Create bullet points for each tool call
                    tool_calls_content = Text()
                    for formatted_tool_call in run_response.formatted_tool_calls:
                        tool_calls_content.append(f"• {formatted_tool_call}\n")

                    tool_calls_panel = create_panel(
                        content=tool_calls_content.plain.rstrip(),
                        title="Tool Calls",
                        border_style="yellow",
                    )
                    panels.append(tool_calls_panel)
                    live_log.update(Group(*panels))

                response_content_batch: Union[str, JSON, Markdown] = ""  # type: ignore
                if isinstance(run_response, RunResponse):
                    if isinstance(run_response.content, str):
                        if self.markdown:
                            escaped_content = escape_markdown_tags(run_response.content, tags_to_include_in_markdown)
                            response_content_batch = Markdown(escaped_content)
                        else:
                            response_content_batch = run_response.get_content_as_string(indent=4)
                    elif self.response_model is not None and isinstance(run_response.content, BaseModel):
                        try:
                            response_content_batch = JSON(
                                run_response.content.model_dump_json(exclude_none=True), indent=2
                            )
                        except Exception as e:
                            log_warning(f"Failed to convert response to JSON: {e}")
                    else:
                        try:
                            response_content_batch = JSON(json.dumps(run_response.content), indent=4)
                        except Exception as e:
                            log_warning(f"Failed to convert response to JSON: {e}")

                # Create panel for response
                response_panel = create_panel(
                    content=response_content_batch,
                    title=f"Response ({response_timer.elapsed:.1f}s)",
                    border_style="blue",
                )
                panels.append(response_panel)

                if (
                    isinstance(run_response, RunResponse)
                    and run_response.citations is not None
                    and run_response.citations.urls is not None
                ):
                    md_content = "\n".join(
                        f"{i + 1}. [{citation.title or citation.url}]({citation.url})"
                        for i, citation in enumerate(run_response.citations.urls)
                        if citation.url  # Only include citations with valid URLs
                    )
                    if md_content:  # Only create panel if there are citations
                        md_content = md_content.strip()
                        citations_panel = create_panel(
                            content=Markdown(md_content),
                            title="Citations",
                            border_style="green",
                        )
                        panels.append(citations_panel)
                        live_log.update(Group(*panels))

                if self.memory is not None and isinstance(self.memory, Memory):
                    if self.memory.memory_manager is not None and self.memory.memory_manager.memories_updated:
                        memory_panel = create_panel(
                            content=Text("Memories updated"),
                            title="Memories",
                            border_style="green",
                        )
                        panels.append(memory_panel)
                        live_log.update(Group(*panels))
                        self.memory.memory_manager.memories_updated = False

                    if self.memory.summary_manager is not None and self.memory.summary_manager.summary_updated:
                        summary_panel = create_panel(
                            content=Text("Session summary updated"),
                            title="Session Summary",
                            border_style="green",
                        )
                        panels.append(summary_panel)
                        live_log.update(Group(*panels))
                        self.memory.summary_manager.summary_updated = False

                # Final update to remove the "Thinking..." status
                panels = [p for p in panels if not isinstance(p, Status)]
                live_log.update(Group(*panels))

    async def aprint_response(
        self,
        message: Optional[Union[List, Dict, str, Message, BaseModel]] = None,
        *,
        messages: Optional[List[Union[Dict, Message]]] = None,
        session_id: Optional[str] = None,
        session_state: Optional[Dict[str, Any]] = None,
        user_id: Optional[str] = None,
        audio: Optional[Sequence[Audio]] = None,
        images: Optional[Sequence[Image]] = None,
        videos: Optional[Sequence[Video]] = None,
        files: Optional[Sequence[File]] = None,
        stream: Optional[bool] = None,
        stream_intermediate_steps: bool = False,
        markdown: bool = False,
        show_message: bool = True,
        show_reasoning: bool = True,
        show_full_reasoning: bool = False,
        console: Optional[Any] = None,
        # Add tags to include in markdown content
        tags_to_include_in_markdown: Set[str] = {"think", "thinking"},
        knowledge_filters: Optional[Dict[str, Any]] = None,
        **kwargs: Any,
    ) -> None:
        import json

        from rich.console import Group
        from rich.json import JSON
        from rich.live import Live
        from rich.markdown import Markdown
        from rich.status import Status
        from rich.text import Text

        if markdown:
            self.markdown = True

        if self.response_model is not None:
            self.markdown = False

        stream_intermediate_steps = stream_intermediate_steps or self.stream_intermediate_steps
        stream = stream or self.stream or False
        if stream:
            _response_content: str = ""
            _response_thinking: str = ""
            _response_reasoning_content: str = ""
            reasoning_steps: List[ReasoningStep] = []
            response_content_batch: Union[str, JSON, Markdown] = ""

            with Live(console=console) as live_log:
                status = Status("Thinking...", spinner="aesthetic", speed=0.4, refresh_per_second=10)
                live_log.update(status)
                response_timer = Timer()
                response_timer.start()
                # Flag which indicates if the panels should be rendered
                render = False
                # Panels to be rendered
                panels = [status]
                # First render the message panel if the message is not None
                if message and show_message:
                    render = True
                    # Convert message to a panel
                    message_content = get_text_from_message(message)
                    message_panel = create_panel(
                        content=Text(message_content, style="green"),
                        title="Message",
                        border_style="cyan",
                    )
                    panels.append(message_panel)
                if render:
                    live_log.update(Group(*panels))

                result = await self.arun(
                    message=message,
                    messages=messages,
                    session_id=session_id,
                    session_state=session_state,
                    user_id=user_id,
                    audio=audio,
                    images=images,
                    videos=videos,
                    files=files,
                    stream=True,
                    stream_intermediate_steps=stream_intermediate_steps,
                    knowledge_filters=knowledge_filters,
                    **kwargs,
                )

                async for resp in result:
                    if isinstance(resp, tuple(get_args(RunResponseEvent))):
                        if resp.is_paused:
                            response_panel = create_paused_run_response_panel(resp)
                            panels.append(response_panel)
                            live_log.update(Group(*panels))
                            break

                        if resp.event == RunEvent.run_response_content:
                            if isinstance(resp.content, str):
                                _response_content += resp.content
                            elif self.response_model is not None and isinstance(resp.content, BaseModel):
                                try:
                                    response_content_batch = JSON(
                                        resp.content.model_dump_json(exclude_none=True), indent=2
                                    )  # type: ignore
                                except Exception as e:
                                    log_warning(f"Failed to convert response to JSON: {e}")
                            else:
                                try:
                                    response_content_batch = JSON(json.dumps(resp.content), indent=4)
                                except Exception as e:
                                    log_warning(f"Failed to convert response to JSON: {e}")
                            if resp.thinking is not None:
                                _response_thinking += resp.thinking
                            if hasattr(resp, "reasoning_content") and resp.reasoning_content is not None:
                                _response_reasoning_content += resp.reasoning_content

                        if (
                            hasattr(resp, "extra_data")
                            and resp.extra_data is not None
                            and resp.extra_data.reasoning_steps is not None
                        ):
                            reasoning_steps = resp.extra_data.reasoning_steps

                    response_content_stream: str = _response_content
                    # Escape special tags before markdown conversion
                    if self.markdown:
                        escaped_content = escape_markdown_tags(_response_content, tags_to_include_in_markdown)
                        response_content_batch = Markdown(escaped_content)

                    panels = [status]

                    if message and show_message:
                        render = True
                        # Convert message to a panel
                        message_content = get_text_from_message(message)
                        message_panel = create_panel(
                            content=Text(message_content, style="green"),
                            title="Message",
                            border_style="cyan",
                        )
                        panels.append(message_panel)
                    if render:
                        live_log.update(Group(*panels))

                    if len(reasoning_steps) > 0 and (show_reasoning or show_full_reasoning):
                        render = True
                        # Create panels for reasoning steps
                        for i, step in enumerate(reasoning_steps, 1):
                            # Build step content
                            step_content = Text.assemble()
                            if step.title is not None:
                                step_content.append(f"{step.title}\n", "bold")
                            if step.action is not None:
                                step_content.append(
                                    Text.from_markup(f"[bold]Action:[/bold] {step.action}\n", style="dim")
                                )
                            if step.result is not None:
                                step_content.append(Text.from_markup(step.result, style="dim"))

                            if show_full_reasoning:
                                # Add detailed reasoning information if available
                                if step.reasoning is not None:
                                    step_content.append(
                                        Text.from_markup(f"\n[bold]Reasoning:[/bold] {step.reasoning}", style="dim")
                                    )
                                if step.confidence is not None:
                                    step_content.append(
                                        Text.from_markup(f"\n[bold]Confidence:[/bold] {step.confidence}", style="dim")
                                    )
                            reasoning_panel = create_panel(
                                content=step_content, title=f"Reasoning step {i}", border_style="green"
                            )
                            panels.append(reasoning_panel)
                    if render:
                        live_log.update(Group(*panels))

                    if len(_response_thinking) > 0:
                        render = True
                        # Create panel for thinking
                        thinking_panel = create_panel(
                            content=Text(_response_thinking),
                            title=f"Thinking ({response_timer.elapsed:.1f}s)",
                            border_style="green",
                        )
                        panels.append(thinking_panel)
                    if render:
                        live_log.update(Group(*panels))

                    if len(_response_reasoning_content) > 0:
                        render = True
                        # Create panel for reasoning content
                        reasoning_panel = create_panel(
                            content=Text(_response_reasoning_content),
                            title=f"Reasoning ({response_timer.elapsed:.1f}s)",
                            border_style="green",
                        )
                        panels.append(reasoning_panel)
                    if render:
                        live_log.update(Group(*panels))

                    # Add tool calls panel if available
                    if (
                        self.show_tool_calls
                        and self.run_response is not None
                        and self.run_response.formatted_tool_calls
                    ):
                        render = True
                        # Create bullet points for each tool call
                        tool_calls_content = Text()
                        for formatted_tool_call in self.run_response.formatted_tool_calls:
                            tool_calls_content.append(f"• {formatted_tool_call}\n")

                        tool_calls_panel = create_panel(
                            content=tool_calls_content.plain.rstrip(),
                            title="Tool Calls",
                            border_style="yellow",
                        )
                        panels.append(tool_calls_panel)
                        live_log.update(Group(*panels))

                    response_panel = None
                    # Check if we have any response content to display
                    if response_content_stream and not self.markdown:
                        response_content = response_content_stream
                    else:
                        response_content = response_content_batch  # type: ignore

                    # Sanitize empty Markdown content
                    if isinstance(response_content, Markdown):
                        if not (response_content.markup and response_content.markup.strip()):
                            response_content = None  # type: ignore

                    if response_content:
                        render = True
                        response_panel = create_panel(
                            content=response_content,
                            title=f"Response ({response_timer.elapsed:.1f}s)",
                            border_style="blue",
                        )
                        panels.append(response_panel)
                    if render:
                        live_log.update(Group(*panels))

                    if (
                        isinstance(resp, tuple(get_args(RunResponseEvent)))
                        and hasattr(resp, "citations")
                        and resp.citations is not None
                        and resp.citations.urls is not None
                    ):
                        md_content = "\n".join(
                            f"{i + 1}. [{citation.title or citation.url}]({citation.url})"
                            for i, citation in enumerate(resp.citations.urls)
                            if citation.url  # Only include citations with valid URLs
                        )
                        if md_content:  # Only create panel if there are citations
                            citations_panel = create_panel(
                                content=Markdown(md_content),
                                title="Citations",
                                border_style="green",
                            )
                            panels.append(citations_panel)
                            live_log.update(Group(*panels))

                if self.memory is not None and isinstance(self.memory, Memory):
                    if self.memory.memory_manager is not None and self.memory.memory_manager.memories_updated:
                        memory_panel = create_panel(
                            content=Text("Memories updated"),
                            title="Memories",
                            border_style="green",
                        )
                        panels.append(memory_panel)
                        live_log.update(Group(*panels))
                        self.memory.memory_manager.memories_updated = False

                    if self.memory.summary_manager is not None and self.memory.summary_manager.summary_updated:
                        summary_panel = create_panel(
                            content=Text("Session summary updated"),
                            title="Session Summary",
                            border_style="green",
                        )
                        panels.append(summary_panel)
                        live_log.update(Group(*panels))
                        self.memory.summary_manager.summary_updated = False

                response_timer.stop()

                # Final update to remove the "Thinking..." status
                panels = [p for p in panels if not isinstance(p, Status)]
                live_log.update(Group(*panels))
        else:
            with Live(console=console) as live_log:
                status = Status("Thinking...", spinner="aesthetic", speed=0.4, refresh_per_second=10)
                live_log.update(status)
                response_timer = Timer()
                response_timer.start()
                # Panels to be rendered
                panels = [status]
                # First render the message panel if the message is not None
                if message and show_message:
                    # Convert message to a panel
                    message_content = get_text_from_message(message)
                    message_panel = create_panel(
                        content=Text(message_content, style="green"),
                        title="Message",
                        border_style="cyan",
                    )
                    panels.append(message_panel)
                    live_log.update(Group(*panels))

                # Run the agent
                run_response = await self.arun(
                    message=message,
                    messages=messages,
                    session_id=session_id,
                    session_state=session_state,
                    user_id=user_id,
                    audio=audio,
                    images=images,
                    videos=videos,
                    files=files,
                    stream=False,
                    stream_intermediate_steps=stream_intermediate_steps,
                    knowledge_filters=knowledge_filters,
                    **kwargs,
                )
                response_timer.stop()

                if isinstance(run_response, RunResponse) and run_response.is_paused:
                    response_panel = create_paused_run_response_panel(run_response)
                    panels.append(response_panel)
                    live_log.update(Group(*panels))
                    return

                reasoning_steps = []
                if (
                    isinstance(run_response, RunResponse)
                    and run_response.extra_data is not None
                    and run_response.extra_data.reasoning_steps is not None
                ):
                    reasoning_steps = run_response.extra_data.reasoning_steps

                if len(reasoning_steps) > 0 and show_reasoning:
                    # Create panels for reasoning steps
                    for i, step in enumerate(reasoning_steps, 1):
                        # Build step content
                        step_content = Text.assemble()
                        if step.title is not None:
                            step_content.append(f"{step.title}\n", "bold")
                        if step.action is not None:
                            step_content.append(Text.from_markup(f"[bold]Action:[/bold] {step.action}\n", style="dim"))
                        if step.result is not None:
                            step_content.append(Text.from_markup(step.result, style="dim"))

                        if show_full_reasoning:
                            # Add detailed reasoning information if available
                            if step.reasoning is not None:
                                step_content.append(
                                    Text.from_markup(f"\n[bold]Reasoning:[/bold] {step.reasoning}", style="dim")
                                )
                            if step.confidence is not None:
                                step_content.append(
                                    Text.from_markup(f"\n[bold]Confidence:[/bold] {step.confidence}", style="dim")
                                )
                        reasoning_panel = create_panel(
                            content=step_content, title=f"Reasoning step {i}", border_style="green"
                        )
                        panels.append(reasoning_panel)
                    live_log.update(Group(*panels))

                if isinstance(run_response, RunResponse) and run_response.thinking is not None:
                    # Create panel for thinking
                    thinking_panel = create_panel(
                        content=Text(run_response.thinking),
                        title=f"Thinking ({response_timer.elapsed:.1f}s)",
                        border_style="green",
                    )
                    panels.append(thinking_panel)
                    live_log.update(Group(*panels))

                if self.show_tool_calls and isinstance(run_response, RunResponse) and run_response.formatted_tool_calls:
                    tool_calls_content = Text()
                    for formatted_tool_call in run_response.formatted_tool_calls:
                        tool_calls_content.append(f"• {formatted_tool_call}\n")

                    tool_calls_panel = create_panel(
                        content=tool_calls_content.plain.rstrip(),
                        title="Tool Calls",
                        border_style="yellow",
                    )
                    panels.append(tool_calls_panel)
                    live_log.update(Group(*panels))

                response_content_batch: Union[str, JSON, Markdown] = ""  # type: ignore
                if isinstance(run_response, RunResponse):
                    if isinstance(run_response.content, str):
                        if self.markdown:
                            escaped_content = escape_markdown_tags(run_response.content, tags_to_include_in_markdown)
                            response_content_batch = Markdown(escaped_content)
                        else:
                            response_content_batch = run_response.get_content_as_string(indent=4)
                    elif self.response_model is not None and isinstance(run_response.content, BaseModel):
                        try:
                            response_content_batch = JSON(
                                run_response.content.model_dump_json(exclude_none=True), indent=2
                            )
                        except Exception as e:
                            log_warning(f"Failed to convert response to JSON: {e}")
                    else:
                        try:
                            response_content_batch = JSON(json.dumps(run_response.content), indent=4)
                        except Exception as e:
                            log_warning(f"Failed to convert response to JSON: {e}")

                # Create panel for response
                response_panel = create_panel(
                    content=response_content_batch,
                    title=f"Response ({response_timer.elapsed:.1f}s)",
                    border_style="blue",
                )
                panels.append(response_panel)

                if (
                    isinstance(run_response, RunResponse)
                    and run_response.citations is not None
                    and run_response.citations.urls is not None
                ):
                    md_content = "\n".join(
                        f"{i + 1}. [{citation.title or citation.url}]({citation.url})"
                        for i, citation in enumerate(run_response.citations.urls)
                        if citation.url  # Only include citations with valid URLs
                    )
                    if md_content:  # Only create panel if there are citations
                        citations_panel = create_panel(
                            content=Markdown(md_content),
                            title="Citations",
                            border_style="green",
                        )
                        panels.append(citations_panel)
                        live_log.update(Group(*panels))

                if self.memory is not None and isinstance(self.memory, Memory):
                    if self.memory.memory_manager is not None and self.memory.memory_manager.memories_updated:
                        memory_panel = create_panel(
                            content=Text("Memories updated"),
                            title="Memories",
                            border_style="green",
                        )
                        panels.append(memory_panel)
                        live_log.update(Group(*panels))
                        self.memory.memory_manager.memories_updated = False

                    if self.memory.summary_manager is not None and self.memory.summary_manager.summary_updated:
                        summary_panel = create_panel(
                            content=Text("Session summary updated"),
                            title="Session Summary",
                            border_style="green",
                        )
                        panels.append(summary_panel)
                        live_log.update(Group(*panels))
                        self.memory.summary_manager.summary_updated = False

                # Final update to remove the "Thinking..." status
                panels = [p for p in panels if not isinstance(p, Status)]
                live_log.update(Group(*panels))

    def update_reasoning_content_from_tool_call(
        self, tool_name: str, tool_args: Dict[str, Any]
    ) -> Optional[ReasoningStep]:
        """Update reasoning_content based on tool calls that look like thinking or reasoning tools."""

        # Case 1: ReasoningTools.think (has title, thought, optional action and confidence)
        if tool_name.lower() == "think" and "title" in tool_args and "thought" in tool_args:
            title = tool_args["title"]
            thought = tool_args["thought"]
            action = tool_args.get("action", "")
            confidence = tool_args.get("confidence", None)

            # Create a reasoning step
            reasoning_step = ReasoningStep(
                title=title,
                reasoning=thought,
                action=action,
                next_action=NextAction.CONTINUE,
                confidence=confidence,
            )

            # Add the step to the run response
            self._add_reasoning_step_to_extra_data(reasoning_step)

            formatted_content = f"## {title}\n{thought}\n"
            if action:
                formatted_content += f"Action: {action}\n"
            if confidence is not None:
                formatted_content += f"Confidence: {confidence}\n"
            formatted_content += "\n"

            self._append_to_reasoning_content(formatted_content)
            return reasoning_step

        # Case 2: ReasoningTools.analyze (has title, result, analysis, optional next_action and confidence)
        elif tool_name.lower() == "analyze" and "title" in tool_args:
            title = tool_args["title"]
            result = tool_args.get("result", "")
            analysis = tool_args.get("analysis", "")
            next_action = tool_args.get("next_action", "")
            confidence = tool_args.get("confidence", None)

            # Map string next_action to enum
            next_action_enum = NextAction.CONTINUE
            if next_action.lower() == "validate":
                next_action_enum = NextAction.VALIDATE
            elif next_action.lower() in ["final", "final_answer", "finalize"]:
                next_action_enum = NextAction.FINAL_ANSWER

            # Create a reasoning step
            reasoning_step = ReasoningStep(
                title=title,
                result=result,
                reasoning=analysis,
                next_action=next_action_enum,
                confidence=confidence,
            )

            # Add the step to the run response
            self._add_reasoning_step_to_extra_data(reasoning_step)

            formatted_content = f"## {title}\n"
            if result:
                formatted_content += f"Result: {result}\n"
            if analysis:
                formatted_content += f"{analysis}\n"
            if next_action and next_action.lower() != "continue":
                formatted_content += f"Next Action: {next_action}\n"
            if confidence is not None:
                formatted_content += f"Confidence: {confidence}\n"
            formatted_content += "\n"

            self._append_to_reasoning_content(formatted_content)
            return reasoning_step

        # Case 3: ThinkingTools.think (simple format, just has 'thought')
        elif tool_name.lower() == "think" and "thought" in tool_args:
            thought = tool_args["thought"]
            reasoning_step = ReasoningStep(
                title="Thinking",
                reasoning=thought,
                confidence=None,
            )
            formatted_content = f"## Thinking\n{thought}\n\n"
            self._add_reasoning_step_to_extra_data(reasoning_step)
            self._append_to_reasoning_content(formatted_content)
            return reasoning_step

        return None

    def _append_to_reasoning_content(self, content: str) -> None:
        """Helper to append content to the reasoning_content field."""
        if not hasattr(self.run_response, "reasoning_content") or not self.run_response.reasoning_content:  # type: ignore
            self.run_response.reasoning_content = content  # type: ignore
        else:
            self.run_response.reasoning_content += content  # type: ignore

    def _add_reasoning_step_to_extra_data(self, reasoning_step: ReasoningStep) -> None:
        if hasattr(self, "run_response") and self.run_response is not None:
            if self.run_response.extra_data is None:
                from agno.run.response import RunResponseExtraData

                self.run_response.extra_data = RunResponseExtraData()

            if self.run_response.extra_data.reasoning_steps is None:
                self.run_response.extra_data.reasoning_steps = []

            self.run_response.extra_data.reasoning_steps.append(reasoning_step)

    def _add_reasoning_metrics_to_extra_data(self, reasoning_time_taken: float) -> None:
        try:
            if hasattr(self, "run_response") and self.run_response is not None:
                if self.run_response.extra_data is None:
                    from agno.run.response import RunResponseExtraData

                    self.run_response.extra_data = RunResponseExtraData()

                # Initialize reasoning_messages if it doesn't exist
                if self.run_response.extra_data.reasoning_messages is None:
                    self.run_response.extra_data.reasoning_messages = []

                metrics_message = Message(
                    role="assistant",
                    content=self.run_response.reasoning_content,
                    metrics={"time": reasoning_time_taken},
                )

                # Add the metrics message to the reasoning_messages
                self.run_response.extra_data.reasoning_messages.append(metrics_message)

        except Exception as e:
            # Log the error but don't crash
            from agno.utils.log import log_error

            log_error(f"Failed to add reasoning metrics to extra_data: {str(e)}")

    def _get_effective_filters(self, knowledge_filters: Optional[Dict[str, Any]] = None) -> Optional[Dict[str, Any]]:
        """
        Determine which knowledge filters to use, with priority to run-level filters.

        Args:
            knowledge_filters: Filters passed at run time

        Returns:
            The effective filters to use, with run-level filters taking priority
        """
        effective_filters = None

        # If agent has filters, use those as a base
        if self.knowledge_filters:
            effective_filters = self.knowledge_filters.copy()

        # If run has filters, they override agent filters
        if knowledge_filters:
            if effective_filters:
                # Merge filters, with run filters taking priority
                effective_filters.update(knowledge_filters)
            else:
                effective_filters = knowledge_filters

        if effective_filters:
            log_debug(f"Using knowledge filters: {effective_filters}")

        return effective_filters

    def get_previous_sessions_messages_function(
        self, num_history_sessions: Optional[int] = 2, user_id: Optional[str] = None
    ) -> Callable:
        """Factory function to create a get_previous_session_messages function.

        Args:
            user_id: The user ID to get sessions for
            num_history_sessions: The last n sessions to be taken from db

        Returns:
            Callable: A function that retrieves messages from previous sessions
        """

        def get_previous_session_messages() -> str:
            """Use this function to retrieve messages from previous chat sessions.
            USE THIS TOOL ONLY WHEN THE QUESTION IS EITHER "What was my last conversation?" or "What was my last question?" and similar to it.

            Returns:
                str: JSON formatted list of message pairs from previous sessions
            """
            import json

            if self.storage is None:
                return "Storage not available"

            selected_sessions = self.storage.get_recent_sessions(limit=num_history_sessions, user_id=user_id)

            all_messages = []
            seen_message_pairs = set()

            for session in selected_sessions:
                if isinstance(session, AgentSession) and session.memory:
                    message_count = 0
                    for run in session.memory.get("runs", []):
                        messages = run.get("messages", [])
                        for i in range(0, len(messages) - 1, 2):
                            if i + 1 < len(messages):
                                try:
                                    user_msg = messages[i]
                                    assistant_msg = messages[i + 1]
                                    user_content = user_msg.get("content")
                                    assistant_content = assistant_msg.get("content")

                                    if user_content is None or assistant_content is None:
                                        continue  # Skip this pair if either message has no content

                                    msg_pair_id = f"{user_content}:{assistant_content}"
                                    if msg_pair_id not in seen_message_pairs:
                                        seen_message_pairs.add(msg_pair_id)
                                        all_messages.append(Message.model_validate(user_msg))
                                        all_messages.append(Message.model_validate(assistant_msg))
                                        message_count += 1
                                except Exception as e:
                                    log_warning(f"Error processing message pair: {e}")
                                    continue

            return json.dumps([msg.to_dict() for msg in all_messages]) if all_messages else "No history found"

        return get_previous_session_messages

    def cli_app(
        self,
        message: Optional[str] = None,
        session_id: Optional[str] = None,
        user_id: Optional[str] = None,
        user: str = "User",
        emoji: str = ":sunglasses:",
        stream: bool = False,
        markdown: bool = False,
        exit_on: Optional[List[str]] = None,
        **kwargs: Any,
    ) -> None:
        """Run an interactive command-line interface to interact with the agent."""

        from inspect import isawaitable

        from rich.prompt import Prompt

        # Ensuring the agent is not using our async MCP tools
        if self.tools is not None:
            for tool in self.tools:
                if isawaitable(tool):
                    raise NotImplementedError("Use `acli_app` to use async tools.")
                if tool.__class__.__name__ in ["MCPTools", "MultiMCPTools"]:
                    raise NotImplementedError("Use `acli_app` to use MCP tools.")

        if message:
            self.print_response(
                message=message, stream=stream, markdown=markdown, user_id=user_id, session_id=session_id, **kwargs
            )

        _exit_on = exit_on or ["exit", "quit", "bye"]
        while True:
            message = Prompt.ask(f"[bold] {emoji} {user} [/bold]")
            if message in _exit_on:
                break

            self.print_response(
                message=message, stream=stream, markdown=markdown, user_id=user_id, session_id=session_id, **kwargs
            )

    def get_agent_config_dict(self) -> Dict[str, Any]:
        tools: List[Dict[str, Any]] = []
        if self.tools is not None:
            if not hasattr(self, "_tools_for_model") or self._tools_for_model is None:
                team_model = self.model
                session_id = self.session_id
                if team_model and session_id is not None:
                    self.determine_tools_for_model(model=team_model, session_id=session_id)

            if self._tools_for_model is not None:
                for tool in self._tools_for_model:
                    if isinstance(tool, dict) and tool.get("type") == "function":
                        tools.append(tool["function"])
        model = None
        if self.model is not None:
            model = {
                "name": str(self.model.__class__.__name__),
                "model": str(self.model.id),
                "provider": str(self.model.provider),
            }

        payload = {
            "instructions": self.instructions if self.instructions is not None else [],
            "tools": tools,
            "memory": (
                {
                    "name": self.memory.__class__.__name__,
                    "model": (
                        {
                            "name": self.memory.model.__class__.__name__,
                            "model": self.memory.model.id,
                            "provider": self.memory.model.provider,
                        }
                        if hasattr(self.memory, "model") and self.memory.model is not None
                        else (
                            {
                                "name": self.model.__class__.__name__,
                                "model": self.model.id,
                                "provider": self.model.provider,
                            }
                            if self.model is not None
                            else None
                        )
                    ),
                    "db": (
                        {
                            "name": self.memory.db.__class__.__name__,
                            "table_name": self.memory.db.table_name if hasattr(self.memory.db, "table_name") else None,
                            "db_url": self.memory.db.db_url if hasattr(self.memory.db, "db_url") else None,
                        }
                        if hasattr(self.memory, "db") and self.memory.db is not None
                        else None
                    ),
                }
                if self.memory is not None and hasattr(self.memory, "db") and self.memory.db is not None
                else None
            ),
            "storage": {
                "name": self.storage.__class__.__name__,
            }
            if self.storage is not None
            else None,
            "knowledge": {
                "name": self.knowledge.__class__.__name__,
            }
            if self.knowledge is not None
            else None,
            "model": model,
            "name": self.name,
            "description": self.description,
        }
        payload = {k: v for k, v in payload.items() if v is not None}
        return payload

    async def acli_app(
        self,
        message: Optional[str] = None,
        session_id: Optional[str] = None,
        user_id: Optional[str] = None,
        user: str = "User",
        emoji: str = ":sunglasses:",
        stream: bool = False,
        markdown: bool = False,
        exit_on: Optional[List[str]] = None,
        **kwargs: Any,
    ) -> None:
        """
        Run an interactive command-line interface to interact with the agent.
        Works with agent dependencies requiring async logic.
        """
        from rich.prompt import Prompt

        if message:
            await self.aprint_response(
                message=message, stream=stream, markdown=markdown, user_id=user_id, session_id=session_id, **kwargs
            )

        _exit_on = exit_on or ["exit", "quit", "bye"]
        while True:
            message = Prompt.ask(f"[bold] {emoji} {user} [/bold]")
            if message in _exit_on:
                break

            await self.aprint_response(
                message=message, stream=stream, markdown=markdown, user_id=user_id, session_id=session_id, **kwargs
            )
